scouter_dataframe/parquet/
dataframe.rs

1use crate::error::DataFrameError;
2use crate::parquet::custom::CustomMetricDataFrame;
3use crate::parquet::psi::PsiDataFrame;
4use crate::parquet::spc::SpcDataFrame;
5use crate::parquet::traits::ParquetFrame;
6use crate::storage::ObjectStore;
7use chrono::{DateTime, Utc};
8use datafusion::prelude::DataFrame;
9use scouter_settings::ObjectStorageSettings;
10use scouter_types::{RecordType, ServerRecords, StorageType};
11use tracing::instrument;
12
13pub enum ParquetDataFrame {
14    CustomMetric(CustomMetricDataFrame),
15    Psi(PsiDataFrame),
16    Spc(SpcDataFrame),
17}
18
19impl ParquetDataFrame {
20    pub fn new(
21        storage_settings: &ObjectStorageSettings,
22        record_type: &RecordType,
23    ) -> Result<Self, DataFrameError> {
24        match record_type {
25            RecordType::Custom => Ok(ParquetDataFrame::CustomMetric(CustomMetricDataFrame::new(
26                storage_settings,
27            )?)),
28            RecordType::Psi => Ok(ParquetDataFrame::Psi(PsiDataFrame::new(storage_settings)?)),
29            RecordType::Spc => Ok(ParquetDataFrame::Spc(SpcDataFrame::new(storage_settings)?)),
30
31            _ => Err(DataFrameError::InvalidRecordTypeError),
32        }
33    }
34
35    /// Write the records to a parquet file at the given path.
36    ///
37    /// # Arguments
38    ///
39    /// * `rpath` - The path to write the parquet file to. (This path should exclude root path)
40    /// * `records` - The records to write to the parquet file.
41    ///
42    #[instrument(skip_all, err)]
43    pub async fn write_parquet(
44        &self,
45        rpath: &str,
46        records: ServerRecords,
47    ) -> Result<(), DataFrameError> {
48        let rpath = &self.resolve_path(rpath);
49
50        match self {
51            ParquetDataFrame::CustomMetric(df) => df.write_parquet(rpath, records).await,
52            ParquetDataFrame::Psi(df) => df.write_parquet(rpath, records).await,
53            ParquetDataFrame::Spc(df) => df.write_parquet(rpath, records).await,
54        }
55    }
56
57    pub fn storage_root(&self) -> String {
58        match self {
59            ParquetDataFrame::CustomMetric(df) => df.storage_root(),
60            ParquetDataFrame::Psi(df) => df.storage_root(),
61            ParquetDataFrame::Spc(df) => df.storage_root(),
62        }
63    }
64
65    /// primarily used for dev
66    pub fn storage_client(&self) -> ObjectStore {
67        match self {
68            ParquetDataFrame::CustomMetric(df) => df.object_store.clone(),
69            ParquetDataFrame::Psi(df) => df.object_store.clone(),
70            ParquetDataFrame::Spc(df) => df.object_store.clone(),
71        }
72    }
73
74    /// Get binned metrics from archived parquet files
75    ///
76    /// # Arguments
77    /// * path - The path to the parquet files (directory). This will be read as a table listing
78    /// * bin - The bin size
79    /// * start_time - The start time of the query
80    /// * end_time - The end time of the query
81    /// * space - The space to query
82    /// * name - The name to query
83    /// * version - The version to query
84    #[allow(clippy::too_many_arguments)]
85    pub async fn get_binned_metrics(
86        &self,
87        path: &str,
88        bin: &f64,
89        start_time: &DateTime<Utc>,
90        end_time: &DateTime<Utc>,
91        space: &str,
92        name: &str,
93        version: &str,
94    ) -> Result<DataFrame, DataFrameError> {
95        let read_path = &self.resolve_path(path);
96
97        match self {
98            ParquetDataFrame::CustomMetric(df) => {
99                df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
100                    .await
101            }
102            ParquetDataFrame::Psi(df) => {
103                df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
104                    .await
105            }
106            ParquetDataFrame::Spc(df) => {
107                df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
108                    .await
109            }
110        }
111    }
112
113    /// Get the underyling object store storage type
114    pub fn storage_type(&self) -> StorageType {
115        match self {
116            ParquetDataFrame::CustomMetric(df) => {
117                df.object_store.storage_settings.storage_type.clone()
118            }
119            ParquetDataFrame::Psi(df) => df.object_store.storage_settings.storage_type.clone(),
120            ParquetDataFrame::Spc(df) => df.object_store.storage_settings.storage_type.clone(),
121        }
122    }
123
124    pub fn resolve_path(&self, path: &str) -> String {
125        format!("{}/{}/", self.storage_root(), path)
126    }
127}
128
129#[cfg(test)]
130mod tests {
131
132    use super::*;
133    use crate::parquet::custom::dataframe_to_custom_drift_metrics;
134    use crate::parquet::psi::dataframe_to_psi_drift_features;
135    use crate::parquet::spc::dataframe_to_spc_drift_features;
136    use chrono::Utc;
137    use object_store::path::Path;
138    use rand::Rng;
139    use scouter_settings::ObjectStorageSettings;
140    use scouter_types::{
141        CustomMetricServerRecord, PsiServerRecord, ServerRecord, ServerRecords, SpcServerRecord,
142    };
143
144    fn cleanup() {
145        let storage_settings = ObjectStorageSettings::default();
146        let current_dir = std::env::current_dir().unwrap();
147        let storage_path = current_dir.join(storage_settings.storage_root());
148        if storage_path.exists() {
149            std::fs::remove_dir_all(storage_path).unwrap();
150        }
151    }
152
153    #[tokio::test]
154    async fn test_write_custom_dataframe_local() {
155        cleanup();
156        let storage_settings = ObjectStorageSettings::default();
157        let df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
158        let mut batch = Vec::new();
159        let start_utc = Utc::now();
160        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
161
162        // create records
163        for i in 0..3 {
164            for j in 0..50 {
165                let record = ServerRecord::Custom(CustomMetricServerRecord {
166                    created_at: Utc::now() + chrono::Duration::hours(i),
167                    name: "test".to_string(),
168                    space: "test".to_string(),
169                    version: "1.0".to_string(),
170                    metric: format!("metric{}", i),
171                    value: j as f64,
172                });
173
174                batch.push(record);
175            }
176        }
177
178        let records = ServerRecords::new(batch);
179        let rpath = "custom";
180        df.write_parquet(rpath, records.clone()).await.unwrap();
181
182        // get canonical path
183        let canonical_path = df.storage_root();
184        let data_path = object_store::path::Path::from(canonical_path);
185
186        // Check if the file exists
187        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
188        assert_eq!(files.len(), 3);
189
190        // attempt to read the file
191        let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
192
193        let read_df = new_df
194            .get_binned_metrics(
195                rpath,
196                &0.01,
197                &start_utc,
198                &end_utc_for_test,
199                "test",
200                "test",
201                "1.0",
202            )
203            .await
204            .unwrap();
205
206        //read_df.show().await.unwrap();
207
208        let binned_metrics = dataframe_to_custom_drift_metrics(read_df).await.unwrap();
209
210        assert_eq!(binned_metrics.metrics.len(), 3);
211
212        //// delete the file
213        for file in files.iter() {
214            let path = Path::from(file.to_string());
215            df.storage_client()
216                .delete(&path)
217                .await
218                .expect("Failed to delete file");
219        }
220        //
221        //// Check if the file is deleted
222        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
223        assert_eq!(files.len(), 0);
224
225        // cleanup
226        cleanup();
227    }
228
229    #[tokio::test]
230    async fn test_write_psi_dataframe_local() {
231        cleanup();
232
233        let storage_settings = ObjectStorageSettings::default();
234        let df = ParquetDataFrame::new(&storage_settings, &RecordType::Psi).unwrap();
235        let mut batch = Vec::new();
236        let start_utc = Utc::now();
237        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
238
239        for i in 0..3 {
240            for j in 0..5 {
241                let record = ServerRecord::Psi(PsiServerRecord {
242                    created_at: Utc::now() + chrono::Duration::hours(i),
243                    name: "test".to_string(),
244                    space: "test".to_string(),
245                    version: "1.0".to_string(),
246                    feature: "feature1".to_string(),
247                    bin_id: j as usize,
248                    bin_count: rand::rng().random_range(0..100),
249                });
250
251                batch.push(record);
252            }
253        }
254
255        for i in 0..3 {
256            for j in 0..5 {
257                let record = ServerRecord::Psi(PsiServerRecord {
258                    created_at: Utc::now() + chrono::Duration::hours(i),
259                    name: "test".to_string(),
260                    space: "test".to_string(),
261                    version: "1.0".to_string(),
262                    feature: "feature2".to_string(),
263                    bin_id: j as usize,
264                    bin_count: rand::rng().random_range(0..100),
265                });
266
267                batch.push(record);
268            }
269        }
270
271        let records = ServerRecords::new(batch);
272        let rpath = "psi";
273        df.write_parquet(rpath, records.clone()).await.unwrap();
274
275        // get canonical path
276        let canonical_path = df.storage_root();
277        let data_path = object_store::path::Path::from(canonical_path);
278
279        // Check if the file exists
280        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
281        assert_eq!(files.len(), 3);
282
283        // attempt to read the file
284        let read_df = df
285            .get_binned_metrics(
286                rpath,
287                &0.01,
288                &start_utc,
289                &end_utc_for_test,
290                "test",
291                "test",
292                "1.0",
293            )
294            .await
295            .unwrap();
296
297        let psi_drift = dataframe_to_psi_drift_features(read_df).await.unwrap();
298        assert_eq!(psi_drift.len(), 2);
299
300        //// delete the file
301        for file in files.iter() {
302            let path = Path::from(file.to_string());
303            df.storage_client()
304                .delete(&path)
305                .await
306                .expect("Failed to delete file");
307        }
308        //
309        //// Check if the file is deleted
310        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
311        assert_eq!(files.len(), 0);
312
313        // cleanup
314        cleanup();
315    }
316
317    #[tokio::test]
318    async fn test_write_spc_dataframe_local() {
319        cleanup();
320        let storage_settings = ObjectStorageSettings::default();
321        let df = ParquetDataFrame::new(&storage_settings, &RecordType::Spc).unwrap();
322        let mut batch = Vec::new();
323        let start_utc = Utc::now();
324        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
325
326        for i in 0..5 {
327            let record = ServerRecord::Spc(SpcServerRecord {
328                created_at: Utc::now() + chrono::Duration::hours(i),
329                name: "test".to_string(),
330                space: "test".to_string(),
331                version: "1.0".to_string(),
332                feature: "feature1".to_string(),
333                value: i as f64,
334            });
335
336            batch.push(record);
337        }
338
339        for i in 0..5 {
340            let record = ServerRecord::Spc(SpcServerRecord {
341                created_at: Utc::now() + chrono::Duration::hours(i),
342                name: "test".to_string(),
343                space: "test".to_string(),
344                version: "1.0".to_string(),
345                feature: "feature2".to_string(),
346                value: i as f64,
347            });
348
349            batch.push(record);
350        }
351
352        let records = ServerRecords::new(batch);
353        let rpath = "spc";
354        df.write_parquet(rpath, records.clone()).await.unwrap();
355
356        // get canonical path
357        let canonical_path = df.storage_root();
358        let data_path = object_store::path::Path::from(canonical_path);
359
360        // Check if the file exists
361        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
362        assert_eq!(files.len(), 5);
363
364        // attempt to read the file
365        let read_df = df
366            .get_binned_metrics(
367                rpath,
368                &0.01,
369                &start_utc,
370                &end_utc_for_test,
371                "test",
372                "test",
373                "1.0",
374            )
375            .await
376            .unwrap();
377
378        let _spc_drift = dataframe_to_spc_drift_features(read_df).await.unwrap();
379
380        //// delete the file
381        for file in files.iter() {
382            let path = Path::from(file.to_string());
383            df.storage_client()
384                .delete(&path)
385                .await
386                .expect("Failed to delete file");
387        }
388        //
389        //// Check if the file is deleted
390        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
391        assert_eq!(files.len(), 0);
392
393        // cleanup
394        cleanup();
395    }
396}