scouter_dataframe/parquet/
traits.rs1use crate::error::DataFrameError;
2
3use arrow::datatypes::DataType;
4
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use datafusion::datasource::file_format::parquet::ParquetFormat;
8use datafusion::datasource::listing::{
9 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
10};
11use datafusion::prelude::SessionContext;
12use datafusion::prelude::*;
13use datafusion::{dataframe::DataFrameWriteOptions, prelude::DataFrame};
14use scouter_settings::ObjectStorageSettings;
15use scouter_types::{ServerRecords, StorageType};
16use tracing::instrument;
17
18use std::sync::Arc;
19#[async_trait]
20pub trait ParquetFrame {
21 fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError>
22 where
23 Self: Sized;
24
25 #[instrument(skip_all, err)]
33 async fn write_parquet(
34 &self,
35 rpath: &str,
36 records: ServerRecords,
37 ) -> Result<(), DataFrameError> {
38 let df = self.get_dataframe(records).await?;
39
40 let df = df
42 .with_column("year", date_part(lit("year"), col("created_at")))
43 .map_err(DataFrameError::AddYearColumnError)?
44 .with_column("month", date_part(lit("month"), col("created_at")))
45 .map_err(DataFrameError::AddMonthColumnError)?
46 .with_column("day", date_part(lit("day"), col("created_at")))
47 .map_err(DataFrameError::AddDayColumnError)?
48 .with_column("hour", date_part(lit("hour"), col("created_at")))
49 .map_err(DataFrameError::AddHourColumnError)?;
50
51 let write_options = DataFrameWriteOptions::new().with_partition_by(vec![
52 "year".to_string(),
54 "month".to_string(),
55 "day".to_string(),
56 "hour".to_string(),
57 ]);
58
59 df.write_parquet(rpath, write_options, None)
60 .await
61 .map_err(DataFrameError::WriteParquetError)?;
62
63 Ok(())
64 }
65
66 async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError>;
67
68 fn storage_root(&self) -> String;
70
71 fn storage_type(&self) -> StorageType;
72
73 fn get_session_context(&self) -> Result<SessionContext, DataFrameError>;
75
76 fn table_name(&self) -> String;
78
79 fn get_binned_sql(
81 &self,
82 bin: &f64,
83 start_time: &DateTime<Utc>,
84 end_time: &DateTime<Utc>,
85 entity_id: &i32,
86 ) -> String;
87
88 async fn register_table(
96 &self,
97 path: &str,
98 table_name: &str,
99 ) -> Result<SessionContext, DataFrameError> {
100 let ctx = self.get_session_context()?;
101
102 let table_path = ListingTableUrl::parse(path)?;
103
104 let file_format = ParquetFormat::new();
105 let listing_options = ListingOptions::new(Arc::new(file_format))
106 .with_file_extension(".parquet")
107 .with_target_partitions(8)
108 .with_table_partition_cols(vec![
109 ("year".to_string(), DataType::Int32),
110 ("month".to_string(), DataType::Int32),
111 ("day".to_string(), DataType::Int32),
112 ("hour".to_string(), DataType::Int32),
113 ]);
114
115 let resolved_schema = listing_options
116 .infer_schema(&ctx.state(), &table_path)
117 .await
118 .map_err(DataFrameError::InferSchemaError)?;
119
120 let config = ListingTableConfig::new(table_path)
121 .with_listing_options(listing_options)
122 .with_schema(resolved_schema);
123
124 let provider = Arc::new(
125 ListingTable::try_new(config).map_err(DataFrameError::CreateListingTableError)?,
126 );
127
128 ctx.register_table(table_name, provider)
129 .map_err(DataFrameError::RegisterTableError)?;
130 Ok(ctx)
131 }
132
133 #[allow(clippy::too_many_arguments)]
146 async fn get_binned_metrics(
147 &self,
148 path: &str,
149 bin: &f64,
150 start_time: &DateTime<Utc>,
151 end_time: &DateTime<Utc>,
152 entity_id: &i32,
153 ) -> Result<DataFrame, DataFrameError> {
154 let ctx = self.register_table(path, &self.table_name()).await?;
156 let sql = self.get_binned_sql(bin, start_time, end_time, entity_id);
157 let df = ctx.sql(&sql).await?;
158
159 Ok(df)
160 }
161}