scouter_dataframe/parquet/
traits.rs

1use crate::error::DataFrameError;
2
3use arrow::datatypes::DataType;
4
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use datafusion::datasource::file_format::parquet::ParquetFormat;
8use datafusion::datasource::listing::{
9    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
10};
11use datafusion::prelude::SessionContext;
12use datafusion::prelude::*;
13use datafusion::{dataframe::DataFrameWriteOptions, prelude::DataFrame};
14use scouter_settings::ObjectStorageSettings;
15use scouter_types::ServerRecords;
16use scouter_types::StorageType;
17use tracing::instrument;
18
19use std::sync::Arc;
20#[async_trait]
21pub trait ParquetFrame {
22    fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError>
23    where
24        Self: Sized;
25
26    /// Write the records to a parquet file at the given path.
27    ///
28    /// # Arguments
29    ///
30    /// * `rpath` - The path to write the parquet file to. (This path should exclude root path)
31    /// * `records` - The records to write to the parquet file.
32    ///
33    #[instrument(skip_all, err)]
34    async fn write_parquet(
35        &self,
36        rpath: &str,
37        records: ServerRecords,
38    ) -> Result<(), DataFrameError> {
39        let df = self.get_dataframe(records).await?;
40
41        // add partition columns
42        let df = df
43            .with_column("year", date_part(lit("year"), col("created_at")))
44            .map_err(DataFrameError::AddYearColumnError)?
45            .with_column("month", date_part(lit("month"), col("created_at")))
46            .map_err(DataFrameError::AddMonthColumnError)?
47            .with_column("day", date_part(lit("day"), col("created_at")))
48            .map_err(DataFrameError::AddDayColumnError)?
49            .with_column("hour", date_part(lit("hour"), col("created_at")))
50            .map_err(DataFrameError::AddHourColumnError)?;
51
52        let write_options = DataFrameWriteOptions::new().with_partition_by(vec![
53            // time partitioning
54            "year".to_string(),
55            "month".to_string(),
56            "day".to_string(),
57            "hour".to_string(),
58        ]);
59
60        df.write_parquet(rpath, write_options, None)
61            .await
62            .map_err(DataFrameError::WriteParquetError)?;
63
64        Ok(())
65    }
66
67    async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError>;
68
69    /// Get the storage root path
70    fn storage_root(&self) -> String;
71
72    fn storage_type(&self) -> StorageType;
73
74    // Add this new required method
75    fn get_session_context(&self) -> Result<SessionContext, DataFrameError>;
76
77    // Get the table name
78    fn table_name(&self) -> String;
79
80    // Get binned SQL
81    fn get_binned_sql(
82        &self,
83        bin: &f64,
84        start_time: &DateTime<Utc>,
85        end_time: &DateTime<Utc>,
86        space: &str,
87        name: &str,
88        version: &str,
89    ) -> String;
90
91    /// Load storage files into parquet table for querying
92    ///
93    /// # Arguments
94    ///
95    /// * `path` - The path to the parquet file (this path should exclude root path)
96    /// * `table_name` - The name of the table to register
97    ///
98    async fn register_table(
99        &self,
100        path: &str,
101        table_name: &str,
102    ) -> Result<SessionContext, DataFrameError> {
103        let ctx = self.get_session_context()?;
104
105        let table_path = ListingTableUrl::parse(path)?;
106
107        let file_format = ParquetFormat::new();
108        let listing_options = ListingOptions::new(Arc::new(file_format))
109            .with_file_extension(".parquet")
110            .with_target_partitions(8)
111            .with_table_partition_cols(vec![
112                ("year".to_string(), DataType::Int32),
113                ("month".to_string(), DataType::Int32),
114                ("day".to_string(), DataType::Int32),
115                ("hour".to_string(), DataType::Int32),
116            ]);
117
118        let resolved_schema = listing_options
119            .infer_schema(&ctx.state(), &table_path)
120            .await
121            .map_err(DataFrameError::InferSchemaError)?;
122
123        let config = ListingTableConfig::new(table_path)
124            .with_listing_options(listing_options)
125            .with_schema(resolved_schema);
126
127        let provider = Arc::new(
128            ListingTable::try_new(config).map_err(DataFrameError::CreateListingTableError)?,
129        );
130
131        ctx.register_table(table_name, provider)
132            .map_err(DataFrameError::RegisterTableError)?;
133        Ok(ctx)
134    }
135
136    /// Get binned metrics from the parquet file
137    ///
138    /// # Arguments
139    ///     
140    /// * `path` - The path to the parquet file (this path should exclude root path)
141    /// * `bin` - The bin value
142    /// * `start_time` - The start time to filter
143    /// * `end_time` - The end time to filter
144    /// * `space` - The space to filter
145    /// * `name` - The name to filter
146    /// * `version` - The version to filter
147    ///
148    #[allow(clippy::too_many_arguments)]
149    async fn get_binned_metrics(
150        &self,
151        path: &str,
152        bin: &f64,
153        start_time: &DateTime<Utc>,
154        end_time: &DateTime<Utc>,
155        space: &str,
156        name: &str,
157        version: &str,
158    ) -> Result<DataFrame, DataFrameError> {
159        // Register the data at path
160        let ctx = self.register_table(path, &self.table_name()).await?;
161        let sql = self.get_binned_sql(bin, start_time, end_time, space, name, version);
162
163        let df = ctx.sql(&sql).await?;
164
165        Ok(df)
166    }
167}