scouter_dataframe/parquet/
traits.rs1use arrow::datatypes::DataType;
2
3use crate::error::DataFrameError;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use datafusion::datasource::file_format::parquet::ParquetFormat;
7use datafusion::datasource::listing::{
8    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
9};
10use datafusion::prelude::SessionContext;
11use datafusion::prelude::*;
12use datafusion::{dataframe::DataFrameWriteOptions, prelude::DataFrame};
13use scouter_settings::ObjectStorageSettings;
14use scouter_types::ServerRecords;
15use scouter_types::StorageType;
16use tracing::instrument;
17
18use std::sync::Arc;
19#[async_trait]
20pub trait ParquetFrame {
21    fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError>
22    where
23        Self: Sized;
24
25    #[instrument(skip_all, err)]
33    async fn write_parquet(
34        &self,
35        rpath: &str,
36        records: ServerRecords,
37    ) -> Result<(), DataFrameError> {
38        let df = self.get_dataframe(records).await?;
39
40        let df = df
42            .with_column("year", date_part(lit("year"), col("created_at")))
43            .map_err(DataFrameError::AddYearColumnError)?
44            .with_column("month", date_part(lit("month"), col("created_at")))
45            .map_err(DataFrameError::AddMonthColumnError)?
46            .with_column("day", date_part(lit("day"), col("created_at")))
47            .map_err(DataFrameError::AddDayColumnError)?
48            .with_column("hour", date_part(lit("hour"), col("created_at")))
49            .map_err(DataFrameError::AddHourColumnError)?;
50
51        let write_options = DataFrameWriteOptions::new().with_partition_by(vec![
52            "year".to_string(),
54            "month".to_string(),
55            "day".to_string(),
56            "hour".to_string(),
57        ]);
58
59        df.write_parquet(rpath, write_options, None)
60            .await
61            .map_err(DataFrameError::WriteParquetError)?;
62
63        Ok(())
64    }
65
66    async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError>;
67
68    fn storage_root(&self) -> String;
70
71    fn storage_type(&self) -> StorageType;
72
73    fn get_session_context(&self) -> Result<SessionContext, DataFrameError>;
75
76    fn table_name(&self) -> String;
78
79    fn get_binned_sql(
81        &self,
82        bin: &f64,
83        start_time: &DateTime<Utc>,
84        end_time: &DateTime<Utc>,
85        space: &str,
86        name: &str,
87        version: &str,
88    ) -> String;
89
90    async fn register_table(
98        &self,
99        path: &str,
100        table_name: &str,
101    ) -> Result<SessionContext, DataFrameError> {
102        let ctx = self.get_session_context()?;
103
104        let table_path = ListingTableUrl::parse(path)?;
105
106        let file_format = ParquetFormat::new();
107        let listing_options = ListingOptions::new(Arc::new(file_format))
108            .with_file_extension(".parquet")
109            .with_target_partitions(8)
110            .with_table_partition_cols(vec![
111                ("year".to_string(), DataType::Int32),
112                ("month".to_string(), DataType::Int32),
113                ("day".to_string(), DataType::Int32),
114                ("hour".to_string(), DataType::Int32),
115            ]);
116
117        let resolved_schema = listing_options
118            .infer_schema(&ctx.state(), &table_path)
119            .await
120            .map_err(DataFrameError::InferSchemaError)?;
121
122        let config = ListingTableConfig::new(table_path)
123            .with_listing_options(listing_options)
124            .with_schema(resolved_schema);
125
126        let provider = Arc::new(
127            ListingTable::try_new(config).map_err(DataFrameError::CreateListingTableError)?,
128        );
129
130        ctx.register_table(table_name, provider)
131            .map_err(DataFrameError::RegisterTableError)?;
132        Ok(ctx)
133    }
134
135    #[allow(clippy::too_many_arguments)]
148    async fn get_binned_metrics(
149        &self,
150        path: &str,
151        bin: &f64,
152        start_time: &DateTime<Utc>,
153        end_time: &DateTime<Utc>,
154        space: &str,
155        name: &str,
156        version: &str,
157    ) -> Result<DataFrame, DataFrameError> {
158        let ctx = self.register_table(path, &self.table_name()).await?;
160        let sql = self.get_binned_sql(bin, start_time, end_time, space, name, version);
161
162        let df = ctx.sql(&sql).await?;
163
164        Ok(df)
165    }
166}