scouter_dataframe/parquet/
traits.rs1use crate::error::DataFrameError;
2
3use arrow::datatypes::DataType;
4
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use datafusion::datasource::file_format::parquet::ParquetFormat;
8use datafusion::datasource::listing::{
9 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
10};
11use datafusion::prelude::SessionContext;
12use datafusion::prelude::*;
13use datafusion::{dataframe::DataFrameWriteOptions, prelude::DataFrame};
14use scouter_settings::ObjectStorageSettings;
15use scouter_types::ServerRecords;
16use scouter_types::StorageType;
17use tracing::instrument;
18
19use std::sync::Arc;
20#[async_trait]
21pub trait ParquetFrame {
22 fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError>
23 where
24 Self: Sized;
25
26 #[instrument(skip_all, err)]
34 async fn write_parquet(
35 &self,
36 rpath: &str,
37 records: ServerRecords,
38 ) -> Result<(), DataFrameError> {
39 let df = self.get_dataframe(records).await?;
40
41 let df = df
43 .with_column("year", date_part(lit("year"), col("created_at")))
44 .map_err(DataFrameError::AddYearColumnError)?
45 .with_column("month", date_part(lit("month"), col("created_at")))
46 .map_err(DataFrameError::AddMonthColumnError)?
47 .with_column("day", date_part(lit("day"), col("created_at")))
48 .map_err(DataFrameError::AddDayColumnError)?
49 .with_column("hour", date_part(lit("hour"), col("created_at")))
50 .map_err(DataFrameError::AddHourColumnError)?;
51
52 let write_options = DataFrameWriteOptions::new().with_partition_by(vec![
53 "year".to_string(),
55 "month".to_string(),
56 "day".to_string(),
57 "hour".to_string(),
58 ]);
59
60 df.write_parquet(rpath, write_options, None)
61 .await
62 .map_err(DataFrameError::WriteParquetError)?;
63
64 Ok(())
65 }
66
67 async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError>;
68
69 fn storage_root(&self) -> String;
71
72 fn storage_type(&self) -> StorageType;
73
74 fn get_session_context(&self) -> Result<SessionContext, DataFrameError>;
76
77 fn table_name(&self) -> String;
79
80 fn get_binned_sql(
82 &self,
83 bin: &f64,
84 start_time: &DateTime<Utc>,
85 end_time: &DateTime<Utc>,
86 space: &str,
87 name: &str,
88 version: &str,
89 ) -> String;
90
91 async fn register_table(
99 &self,
100 path: &str,
101 table_name: &str,
102 ) -> Result<SessionContext, DataFrameError> {
103 let ctx = self.get_session_context()?;
104
105 let table_path = ListingTableUrl::parse(path)?;
106
107 let file_format = ParquetFormat::new();
108 let listing_options = ListingOptions::new(Arc::new(file_format))
109 .with_file_extension(".parquet")
110 .with_target_partitions(8)
111 .with_table_partition_cols(vec![
112 ("year".to_string(), DataType::Int32),
113 ("month".to_string(), DataType::Int32),
114 ("day".to_string(), DataType::Int32),
115 ("hour".to_string(), DataType::Int32),
116 ]);
117
118 let resolved_schema = listing_options
119 .infer_schema(&ctx.state(), &table_path)
120 .await
121 .map_err(DataFrameError::InferSchemaError)?;
122
123 let config = ListingTableConfig::new(table_path)
124 .with_listing_options(listing_options)
125 .with_schema(resolved_schema);
126
127 let provider = Arc::new(
128 ListingTable::try_new(config).map_err(DataFrameError::CreateListingTableError)?,
129 );
130
131 ctx.register_table(table_name, provider)
132 .map_err(DataFrameError::RegisterTableError)?;
133 Ok(ctx)
134 }
135
136 #[allow(clippy::too_many_arguments)]
149 async fn get_binned_metrics(
150 &self,
151 path: &str,
152 bin: &f64,
153 start_time: &DateTime<Utc>,
154 end_time: &DateTime<Utc>,
155 space: &str,
156 name: &str,
157 version: &str,
158 ) -> Result<DataFrame, DataFrameError> {
159 let ctx = self.register_table(path, &self.table_name()).await?;
161 let sql = self.get_binned_sql(bin, start_time, end_time, space, name, version);
162
163 let df = ctx.sql(&sql).await?;
164
165 Ok(df)
166 }
167}