Skip to main content

scouter_dataframe/parquet/
traits.rs

1use crate::error::DataFrameError;
2
3use arrow::datatypes::DataType;
4
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use datafusion::datasource::file_format::parquet::ParquetFormat;
8use datafusion::datasource::listing::{
9    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
10};
11use datafusion::prelude::SessionContext;
12use datafusion::prelude::*;
13use datafusion::{dataframe::DataFrameWriteOptions, prelude::DataFrame};
14use scouter_settings::ObjectStorageSettings;
15use scouter_types::{ServerRecords, StorageType};
16use tracing::instrument;
17
18use std::sync::Arc;
19#[async_trait]
20pub trait ParquetFrame {
21    fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError>
22    where
23        Self: Sized;
24
25    /// Write the records to a parquet file at the given path.
26    ///
27    /// # Arguments
28    ///
29    /// * `rpath` - The path to write the parquet file to. (This path should exclude root path)
30    /// * `records` - The records to write to the parquet file.
31    ///
32    #[instrument(skip_all, err)]
33    async fn write_parquet(
34        &self,
35        rpath: &str,
36        records: ServerRecords,
37    ) -> Result<(), DataFrameError> {
38        let df = self.get_dataframe(records).await?;
39
40        // add partition columns
41        let df = df
42            .with_column("year", date_part(lit("year"), col("created_at")))
43            .map_err(DataFrameError::AddYearColumnError)?
44            .with_column("month", date_part(lit("month"), col("created_at")))
45            .map_err(DataFrameError::AddMonthColumnError)?
46            .with_column("day", date_part(lit("day"), col("created_at")))
47            .map_err(DataFrameError::AddDayColumnError)?
48            .with_column("hour", date_part(lit("hour"), col("created_at")))
49            .map_err(DataFrameError::AddHourColumnError)?;
50
51        let write_options = DataFrameWriteOptions::new().with_partition_by(vec![
52            // time partitioning
53            "year".to_string(),
54            "month".to_string(),
55            "day".to_string(),
56            "hour".to_string(),
57        ]);
58
59        df.write_parquet(rpath, write_options, None)
60            .await
61            .map_err(DataFrameError::WriteParquetError)?;
62
63        Ok(())
64    }
65
66    async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError>;
67
68    /// Get the storage root path
69    fn storage_root(&self) -> String;
70
71    fn storage_type(&self) -> StorageType;
72
73    // Add this new required method
74    fn get_session_context(&self) -> Result<SessionContext, DataFrameError>;
75
76    // Get the table name
77    fn table_name(&self) -> String;
78
79    // Get binned SQL
80    fn get_binned_sql(
81        &self,
82        bin: &f64,
83        start_time: &DateTime<Utc>,
84        end_time: &DateTime<Utc>,
85        entity_id: &i32,
86    ) -> String;
87
88    /// Load storage files into parquet table for querying
89    ///
90    /// # Arguments
91    ///
92    /// * `path` - The path to the parquet file (this path should exclude root path)
93    /// * `table_name` - The name of the table to register
94    ///
95    async fn register_table(
96        &self,
97        path: &str,
98        table_name: &str,
99    ) -> Result<SessionContext, DataFrameError> {
100        let ctx = self.get_session_context()?;
101
102        let table_path = ListingTableUrl::parse(path)?;
103
104        let file_format = ParquetFormat::new();
105        let listing_options = ListingOptions::new(Arc::new(file_format))
106            .with_file_extension(".parquet")
107            .with_target_partitions(8)
108            .with_table_partition_cols(vec![
109                ("year".to_string(), DataType::Int32),
110                ("month".to_string(), DataType::Int32),
111                ("day".to_string(), DataType::Int32),
112                ("hour".to_string(), DataType::Int32),
113            ]);
114
115        let resolved_schema = listing_options
116            .infer_schema(&ctx.state(), &table_path)
117            .await
118            .map_err(DataFrameError::InferSchemaError)?;
119
120        let config = ListingTableConfig::new(table_path)
121            .with_listing_options(listing_options)
122            .with_schema(resolved_schema);
123
124        let provider = Arc::new(
125            ListingTable::try_new(config).map_err(DataFrameError::CreateListingTableError)?,
126        );
127
128        ctx.register_table(table_name, provider)
129            .map_err(DataFrameError::RegisterTableError)?;
130        Ok(ctx)
131    }
132
133    /// Get binned metrics from the parquet file
134    ///
135    /// # Arguments
136    ///     
137    /// * `path` - The path to the parquet file (this path should exclude root path)
138    /// * `bin` - The bin value
139    /// * `start_time` - The start time to filter
140    /// * `end_time` - The end time to filter
141    /// * `space` - The space to filter
142    /// * `name` - The name to filter
143    /// * `version` - The version to filter
144    ///
145    #[allow(clippy::too_many_arguments)]
146    async fn get_binned_metrics(
147        &self,
148        path: &str,
149        bin: &f64,
150        start_time: &DateTime<Utc>,
151        end_time: &DateTime<Utc>,
152        entity_id: &i32,
153    ) -> Result<DataFrame, DataFrameError> {
154        // Register the data at path
155        let ctx = self.register_table(path, &self.table_name()).await?;
156        let sql = self.get_binned_sql(bin, start_time, end_time, entity_id);
157        let df = ctx.sql(&sql).await?;
158
159        Ok(df)
160    }
161}