scouter_dataframe/parquet/
traits.rs1use arrow::datatypes::DataType;
2
3use crate::error::DataFrameError;
4use async_trait::async_trait;
5use chrono::{DateTime, Utc};
6use datafusion::datasource::file_format::parquet::ParquetFormat;
7use datafusion::datasource::listing::{
8 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
9};
10use datafusion::prelude::SessionContext;
11use datafusion::prelude::*;
12use datafusion::{dataframe::DataFrameWriteOptions, prelude::DataFrame};
13use scouter_settings::ObjectStorageSettings;
14use scouter_types::ServerRecords;
15use scouter_types::StorageType;
16use tracing::instrument;
17
18use std::sync::Arc;
19#[async_trait]
20pub trait ParquetFrame {
21 fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError>
22 where
23 Self: Sized;
24
25 #[instrument(skip_all, err)]
33 async fn write_parquet(
34 &self,
35 rpath: &str,
36 records: ServerRecords,
37 ) -> Result<(), DataFrameError> {
38 let df = self.get_dataframe(records).await?;
39
40 let df = df
42 .with_column("year", date_part(lit("year"), col("created_at")))
43 .map_err(DataFrameError::AddYearColumnError)?
44 .with_column("month", date_part(lit("month"), col("created_at")))
45 .map_err(DataFrameError::AddMonthColumnError)?
46 .with_column("day", date_part(lit("day"), col("created_at")))
47 .map_err(DataFrameError::AddDayColumnError)?
48 .with_column("hour", date_part(lit("hour"), col("created_at")))
49 .map_err(DataFrameError::AddHourColumnError)?;
50
51 let write_options = DataFrameWriteOptions::new().with_partition_by(vec![
52 "year".to_string(),
54 "month".to_string(),
55 "day".to_string(),
56 "hour".to_string(),
57 ]);
58
59 df.write_parquet(rpath, write_options, None)
60 .await
61 .map_err(DataFrameError::WriteParquetError)?;
62
63 Ok(())
64 }
65
66 async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError>;
67
68 fn storage_root(&self) -> String;
70
71 fn storage_type(&self) -> StorageType;
72
73 fn get_session_context(&self) -> Result<SessionContext, DataFrameError>;
75
76 fn table_name(&self) -> String;
78
79 fn get_binned_sql(
81 &self,
82 bin: &f64,
83 start_time: &DateTime<Utc>,
84 end_time: &DateTime<Utc>,
85 space: &str,
86 name: &str,
87 version: &str,
88 ) -> String;
89
90 async fn register_table(
98 &self,
99 path: &str,
100 table_name: &str,
101 ) -> Result<SessionContext, DataFrameError> {
102 let ctx = self.get_session_context()?;
103
104 let table_path = ListingTableUrl::parse(path)?;
105
106 let file_format = ParquetFormat::new();
107 let listing_options = ListingOptions::new(Arc::new(file_format))
108 .with_file_extension(".parquet")
109 .with_target_partitions(8)
110 .with_table_partition_cols(vec![
111 ("year".to_string(), DataType::Int32),
112 ("month".to_string(), DataType::Int32),
113 ("day".to_string(), DataType::Int32),
114 ("hour".to_string(), DataType::Int32),
115 ]);
116
117 let resolved_schema = listing_options
118 .infer_schema(&ctx.state(), &table_path)
119 .await
120 .map_err(DataFrameError::InferSchemaError)?;
121
122 let config = ListingTableConfig::new(table_path)
123 .with_listing_options(listing_options)
124 .with_schema(resolved_schema);
125
126 let provider = Arc::new(
127 ListingTable::try_new(config).map_err(DataFrameError::CreateListingTableError)?,
128 );
129
130 ctx.register_table(table_name, provider)
131 .map_err(DataFrameError::RegisterTableError)?;
132 Ok(ctx)
133 }
134
135 #[allow(clippy::too_many_arguments)]
148 async fn get_binned_metrics(
149 &self,
150 path: &str,
151 bin: &f64,
152 start_time: &DateTime<Utc>,
153 end_time: &DateTime<Utc>,
154 space: &str,
155 name: &str,
156 version: &str,
157 ) -> Result<DataFrame, DataFrameError> {
158 let ctx = self.register_table(path, &self.table_name()).await?;
160 let sql = self.get_binned_sql(bin, start_time, end_time, space, name, version);
161
162 let df = ctx.sql(&sql).await?;
163
164 Ok(df)
165 }
166}