scouter_dataframe/parquet/
dataframe.rs

1use crate::error::DataFrameError;
2use crate::parquet::custom::CustomMetricDataFrame;
3use crate::parquet::genai::{GenAIEvalDataFrame, GenAITaskDataFrame, GenAIWorkflowDataFrame};
4use crate::parquet::psi::PsiDataFrame;
5use crate::parquet::spc::SpcDataFrame;
6use crate::parquet::traits::ParquetFrame;
7use crate::storage::ObjectStore;
8use chrono::{DateTime, Utc};
9use datafusion::prelude::DataFrame;
10use scouter_settings::ObjectStorageSettings;
11use scouter_types::{RecordType, ServerRecords, StorageType};
12use tracing::instrument;
13
14pub enum ParquetDataFrame {
15    CustomMetric(CustomMetricDataFrame),
16    Psi(PsiDataFrame),
17    Spc(SpcDataFrame),
18    GenAITask(GenAITaskDataFrame),
19    GenAIWorkflow(GenAIWorkflowDataFrame),
20    GenAIEval(GenAIEvalDataFrame),
21}
22
23impl ParquetDataFrame {
24    pub fn new(
25        storage_settings: &ObjectStorageSettings,
26        record_type: &RecordType,
27    ) -> Result<Self, DataFrameError> {
28        match record_type {
29            RecordType::Custom => Ok(ParquetDataFrame::CustomMetric(CustomMetricDataFrame::new(
30                storage_settings,
31            )?)),
32            RecordType::Psi => Ok(ParquetDataFrame::Psi(PsiDataFrame::new(storage_settings)?)),
33            RecordType::Spc => Ok(ParquetDataFrame::Spc(SpcDataFrame::new(storage_settings)?)),
34            RecordType::GenAITask => Ok(ParquetDataFrame::GenAITask(GenAITaskDataFrame::new(
35                storage_settings,
36            )?)),
37            RecordType::GenAIWorkflow => Ok(ParquetDataFrame::GenAIWorkflow(
38                GenAIWorkflowDataFrame::new(storage_settings)?,
39            )),
40            RecordType::GenAIEval => Ok(ParquetDataFrame::GenAIEval(GenAIEvalDataFrame::new(
41                storage_settings,
42            )?)),
43
44            _ => Err(DataFrameError::InvalidRecordTypeError(
45                record_type.to_string(),
46            )),
47        }
48    }
49
50    /// Write the records to a parquet file at the given path.
51    ///
52    /// # Arguments
53    ///
54    /// * `rpath` - The path to write the parquet file to. (This path should exclude root path)
55    /// * `records` - The records to write to the parquet file.
56    ///
57    #[instrument(skip_all, err)]
58    pub async fn write_parquet(
59        &self,
60        rpath: &str,
61        records: ServerRecords,
62    ) -> Result<(), DataFrameError> {
63        let rpath = &self.resolve_path(rpath);
64
65        match self {
66            ParquetDataFrame::CustomMetric(df) => df.write_parquet(rpath, records).await,
67            ParquetDataFrame::Psi(df) => df.write_parquet(rpath, records).await,
68            ParquetDataFrame::Spc(df) => df.write_parquet(rpath, records).await,
69            ParquetDataFrame::GenAITask(df) => df.write_parquet(rpath, records).await,
70            ParquetDataFrame::GenAIWorkflow(df) => df.write_parquet(rpath, records).await,
71            ParquetDataFrame::GenAIEval(df) => df.write_parquet(rpath, records).await,
72        }
73    }
74
75    pub fn storage_root(&self) -> String {
76        match self {
77            ParquetDataFrame::CustomMetric(df) => df.storage_root(),
78            ParquetDataFrame::Psi(df) => df.storage_root(),
79            ParquetDataFrame::Spc(df) => df.storage_root(),
80            ParquetDataFrame::GenAITask(df) => df.storage_root(),
81            ParquetDataFrame::GenAIWorkflow(df) => df.storage_root(),
82            ParquetDataFrame::GenAIEval(df) => df.storage_root(),
83        }
84    }
85
86    /// primarily used for dev
87    pub fn storage_client(&self) -> ObjectStore {
88        match self {
89            ParquetDataFrame::CustomMetric(df) => df.object_store.clone(),
90            ParquetDataFrame::Psi(df) => df.object_store.clone(),
91            ParquetDataFrame::Spc(df) => df.object_store.clone(),
92            ParquetDataFrame::GenAITask(df) => df.object_store.clone(),
93            ParquetDataFrame::GenAIWorkflow(df) => df.object_store.clone(),
94            ParquetDataFrame::GenAIEval(df) => df.object_store.clone(),
95        }
96    }
97
98    /// Get binned metrics from archived parquet files
99    ///
100    /// # Arguments
101    /// * path - The path to the parquet files (directory). This will be read as a table listing
102    /// * bin - The bin size
103    /// * start_time - The start time of the query
104    /// * end_time - The end time of the query
105    /// * space - The space to query
106    /// * name - The name to query
107    /// * version - The version to query
108    #[allow(clippy::too_many_arguments)]
109    pub async fn get_binned_metrics(
110        &self,
111        path: &str,
112        bin: &f64,
113        start_time: &DateTime<Utc>,
114        end_time: &DateTime<Utc>,
115        entity_id: &i32,
116    ) -> Result<DataFrame, DataFrameError> {
117        let read_path = &self.resolve_path(path);
118
119        match self {
120            ParquetDataFrame::CustomMetric(df) => {
121                df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
122                    .await
123            }
124            ParquetDataFrame::Psi(df) => {
125                df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
126                    .await
127            }
128            ParquetDataFrame::Spc(df) => {
129                df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
130                    .await
131            }
132
133            ParquetDataFrame::GenAITask(df) => {
134                df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
135                    .await
136            }
137            ParquetDataFrame::GenAIWorkflow(df) => {
138                df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
139                    .await
140            }
141            ParquetDataFrame::GenAIEval(_) => Err(DataFrameError::UnsupportedOperation(
142                "GenAI drift does not support binned metrics".to_string(),
143            )),
144        }
145    }
146
147    /// Get the underyling object store storage type
148    pub fn storage_type(&self) -> StorageType {
149        match self {
150            ParquetDataFrame::CustomMetric(df) => {
151                df.object_store.storage_settings.storage_type.clone()
152            }
153            ParquetDataFrame::Psi(df) => df.object_store.storage_settings.storage_type.clone(),
154            ParquetDataFrame::Spc(df) => df.object_store.storage_settings.storage_type.clone(),
155            ParquetDataFrame::GenAITask(df) => {
156                df.object_store.storage_settings.storage_type.clone()
157            }
158            ParquetDataFrame::GenAIWorkflow(df) => {
159                df.object_store.storage_settings.storage_type.clone()
160            }
161            ParquetDataFrame::GenAIEval(df) => {
162                df.object_store.storage_settings.storage_type.clone()
163            }
164        }
165    }
166
167    pub fn resolve_path(&self, path: &str) -> String {
168        format!("{}/{}/", self.storage_root(), path)
169    }
170}
171
172#[cfg(test)]
173mod tests {
174
175    use super::*;
176    use crate::parquet::psi::dataframe_to_psi_drift_features;
177    use crate::parquet::spc::dataframe_to_spc_drift_features;
178    use crate::parquet::types::BinnedTableName;
179    use crate::parquet::utils::BinnedMetricsExtractor;
180    use chrono::Utc;
181    use object_store::path::Path;
182    use rand::Rng;
183    use scouter_settings::ObjectStorageSettings;
184    use scouter_types::{
185        BoxedGenAIEvalRecord, GenAIEvalRecord, PsiRecord, ServerRecord, ServerRecords, SpcRecord,
186        Status,
187    };
188    use scouter_types::{CustomMetricRecord, GenAIEvalTaskResult, GenAIEvalWorkflowResult};
189    use serde_json::Map;
190    use serde_json::Value;
191
192    fn cleanup() {
193        let storage_settings = ObjectStorageSettings::default();
194        let current_dir = std::env::current_dir().unwrap();
195        let storage_path = current_dir.join(storage_settings.storage_root());
196        if storage_path.exists() {
197            std::fs::remove_dir_all(storage_path).unwrap();
198        }
199    }
200
201    #[tokio::test]
202    async fn test_write_genai_event_record_dataframe_local() {
203        cleanup();
204        let storage_settings = ObjectStorageSettings::default();
205        let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIEval).unwrap();
206        let mut batch = Vec::new();
207        let entity_id = rand::rng().random_range(0..100);
208
209        // create records
210        for i in 0..3 {
211            for j in 0..50 {
212                let record = GenAIEvalRecord {
213                    created_at: Utc::now() + chrono::Duration::hours(i),
214                    entity_id,
215                    context: serde_json::Value::Object(Map::new()),
216                    status: Status::Pending,
217                    id: 0,
218                    uid: format!("record_uid_{i}_{j}"),
219                    entity_uid: format!("entity_uid_{entity_id}"),
220                    ..Default::default()
221                };
222
223                let boxed_record = BoxedGenAIEvalRecord::new(record);
224                batch.push(ServerRecord::GenAIEval(boxed_record));
225            }
226        }
227
228        let records = ServerRecords::new(batch);
229        let rpath = BinnedTableName::GenAIEval.to_string();
230        df.write_parquet(&rpath, records.clone()).await.unwrap();
231
232        // get canonical path
233        let canonical_path = df.storage_root();
234        let data_path = object_store::path::Path::from(canonical_path);
235
236        // Check if the file exists
237        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
238        assert_eq!(files.len(), 3);
239
240        //// delete the file
241        for file in files.iter() {
242            let path = Path::from(file.to_string());
243            df.storage_client()
244                .delete(&path)
245                .await
246                .expect("Failed to delete file");
247        }
248        //
249        //// Check if the file is deleted
250        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
251        assert_eq!(files.len(), 0);
252
253        // cleanup
254        cleanup();
255    }
256
257    #[tokio::test]
258    async fn test_write_genai_task_dataframe_local() {
259        cleanup();
260        let storage_settings = ObjectStorageSettings::default();
261        let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAITask).unwrap();
262        let mut batch = Vec::new();
263        let start_utc = Utc::now();
264        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
265        let entity_id = rand::rng().random_range(0..100);
266
267        // create records
268        for i in 0..3 {
269            for j in 0..50 {
270                let record = ServerRecord::GenAITaskRecord(GenAIEvalTaskResult {
271                    record_uid: format!("record_uid_{i}_{j}"),
272                    created_at: Utc::now() + chrono::Duration::hours(i),
273                    start_time: Utc::now() + chrono::Duration::hours(i),
274                    end_time: Utc::now()
275                        + chrono::Duration::hours(i)
276                        + chrono::Duration::minutes(5),
277                    entity_id,
278                    task_id: format!("task{i}"),
279                    task_type: scouter_types::genai::EvaluationTaskType::Assertion,
280                    passed: true,
281                    value: j as f64,
282                    field_path: Some(format!("field.path.{i}")),
283                    operator: scouter_types::genai::ComparisonOperator::Contains,
284                    expected: Value::Null,
285                    actual: Value::Null,
286                    message: "All good".to_string(),
287                    entity_uid: format!("entity_uid_{entity_id}"),
288                    condition: false,
289                    stage: 0,
290                });
291
292                batch.push(record);
293            }
294        }
295
296        let records = ServerRecords::new(batch);
297        let rpath = BinnedTableName::GenAITask.to_string();
298        df.write_parquet(&rpath, records.clone()).await.unwrap();
299
300        // get canonical path
301        let canonical_path = df.storage_root();
302        let data_path = object_store::path::Path::from(canonical_path);
303
304        // Check if the file exists
305        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
306        assert_eq!(files.len(), 3);
307
308        // attempt to read the file
309        let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAITask).unwrap();
310
311        let read_df = new_df
312            .get_binned_metrics(&rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
313            .await
314            .unwrap();
315
316        //read_df.show().await.unwrap();
317
318        let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
319            .await
320            .unwrap();
321
322        assert_eq!(binned_metrics.metrics.len(), 3);
323
324        //// delete the file
325        for file in files.iter() {
326            let path = Path::from(file.to_string());
327            df.storage_client()
328                .delete(&path)
329                .await
330                .expect("Failed to delete file");
331        }
332        //
333        //// Check if the file is deleted
334        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
335        assert_eq!(files.len(), 0);
336
337        // cleanup
338        cleanup();
339    }
340
341    #[tokio::test]
342    async fn test_write_genai_workflow_dataframe_local() {
343        cleanup();
344        let storage_settings = ObjectStorageSettings::default();
345        let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIWorkflow).unwrap();
346        let mut batch = Vec::new();
347        let start_utc = Utc::now();
348        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
349        let entity_id = rand::rng().random_range(0..100);
350
351        // create records
352        for i in 0..3 {
353            for j in 0..50 {
354                let record = ServerRecord::GenAIWorkflowRecord(GenAIEvalWorkflowResult {
355                    record_uid: format!("record_uid_{i}_{j}"),
356                    created_at: Utc::now() + chrono::Duration::hours(i),
357                    entity_id,
358                    total_tasks: 10,
359                    passed_tasks: 8,
360                    failed_tasks: 2,
361                    pass_rate: 0.8,
362                    duration_ms: 1500,
363                    entity_uid: format!("entity_uid_{entity_id}"),
364                    execution_plan: scouter_types::genai::ExecutionPlan::default(),
365                    id: j,
366                });
367
368                batch.push(record);
369            }
370        }
371
372        let records = ServerRecords::new(batch);
373        let rpath = BinnedTableName::GenAIWorkflow.to_string();
374        df.write_parquet(&rpath, records.clone()).await.unwrap();
375
376        // get canonical path
377        let canonical_path = df.storage_root();
378        let data_path = object_store::path::Path::from(canonical_path);
379
380        // Check if the file exists
381        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
382        assert_eq!(files.len(), 3);
383
384        // attempt to read the file
385        let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIWorkflow).unwrap();
386
387        let read_df = new_df
388            .get_binned_metrics(&rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
389            .await
390            .unwrap();
391
392        //read_df.show().await.unwrap();
393
394        let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
395            .await
396            .unwrap();
397
398        assert_eq!(binned_metrics.metrics.len(), 3);
399
400        //// delete the file
401        for file in files.iter() {
402            let path = Path::from(file.to_string());
403            df.storage_client()
404                .delete(&path)
405                .await
406                .expect("Failed to delete file");
407        }
408        //
409        //// Check if the file is deleted
410        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
411        assert_eq!(files.len(), 0);
412
413        // cleanup
414        cleanup();
415    }
416
417    #[tokio::test]
418    async fn test_write_custom_dataframe_local() {
419        cleanup();
420        let storage_settings = ObjectStorageSettings::default();
421        let df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
422        let mut batch = Vec::new();
423        let start_utc = Utc::now();
424        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
425        let entity_id = rand::rng().random_range(0..100);
426        // create records
427        for i in 0..3 {
428            for j in 0..50 {
429                let record = ServerRecord::Custom(CustomMetricRecord {
430                    created_at: Utc::now() + chrono::Duration::hours(i),
431                    metric: format!("metric{i}"),
432                    value: j as f64,
433                    entity_id,
434                    uid: format!("entity_uid_{entity_id}"),
435                });
436
437                batch.push(record);
438            }
439        }
440
441        let records = ServerRecords::new(batch);
442        let rpath = "custom";
443        df.write_parquet(rpath, records.clone()).await.unwrap();
444
445        // get canonical path
446        let canonical_path = df.storage_root();
447        let data_path = object_store::path::Path::from(canonical_path);
448
449        // Check if the file exists
450        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
451        assert_eq!(files.len(), 3);
452
453        // attempt to read the file
454        let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
455
456        let read_df = new_df
457            .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
458            .await
459            .unwrap();
460
461        //read_df.show().await.unwrap();
462
463        let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
464            .await
465            .unwrap();
466
467        assert_eq!(binned_metrics.metrics.len(), 3);
468
469        //// delete the file
470        for file in files.iter() {
471            let path = Path::from(file.to_string());
472            df.storage_client()
473                .delete(&path)
474                .await
475                .expect("Failed to delete file");
476        }
477        //
478        //// Check if the file is deleted
479        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
480        assert_eq!(files.len(), 0);
481
482        // cleanup
483        cleanup();
484    }
485
486    #[tokio::test]
487    async fn test_write_psi_dataframe_local() {
488        cleanup();
489
490        let storage_settings = ObjectStorageSettings::default();
491        let df = ParquetDataFrame::new(&storage_settings, &RecordType::Psi).unwrap();
492        let mut batch = Vec::new();
493        let start_utc = Utc::now();
494        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
495        let entity_id = rand::rng().random_range(0..100);
496        for i in 0..3 {
497            for j in 0..5 {
498                let record = ServerRecord::Psi(PsiRecord {
499                    created_at: Utc::now() + chrono::Duration::hours(i),
500                    feature: "feature1".to_string(),
501                    bin_id: j,
502                    bin_count: rand::rng().random_range(0..100),
503                    entity_id,
504                    uid: format!("entity_uid_{entity_id}"),
505                });
506
507                batch.push(record);
508            }
509        }
510
511        for i in 0..3 {
512            for j in 0..5 {
513                let record = ServerRecord::Psi(PsiRecord {
514                    created_at: Utc::now() + chrono::Duration::hours(i),
515                    feature: "feature2".to_string(),
516                    bin_id: j,
517                    bin_count: rand::rng().random_range(0..100),
518                    entity_id,
519                    uid: format!("entity_uid_{entity_id}"),
520                });
521
522                batch.push(record);
523            }
524        }
525
526        let records = ServerRecords::new(batch);
527        let rpath = "psi";
528        df.write_parquet(rpath, records.clone()).await.unwrap();
529
530        // get canonical path
531        let canonical_path = df.storage_root();
532        let data_path = object_store::path::Path::from(canonical_path);
533
534        // Check if the file exists
535        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
536        assert_eq!(files.len(), 3);
537
538        // attempt to read the file
539        let read_df = df
540            .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
541            .await
542            .unwrap();
543
544        let psi_drift = dataframe_to_psi_drift_features(read_df).await.unwrap();
545        assert_eq!(psi_drift.len(), 2);
546
547        //// delete the file
548        for file in files.iter() {
549            let path = Path::from(file.to_string());
550            df.storage_client()
551                .delete(&path)
552                .await
553                .expect("Failed to delete file");
554        }
555        //
556        //// Check if the file is deleted
557        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
558        assert_eq!(files.len(), 0);
559
560        // cleanup
561        cleanup();
562    }
563
564    #[tokio::test]
565    async fn test_write_spc_dataframe_local() {
566        cleanup();
567        let storage_settings = ObjectStorageSettings::default();
568        let df = ParquetDataFrame::new(&storage_settings, &RecordType::Spc).unwrap();
569        let mut batch = Vec::new();
570        let start_utc = Utc::now();
571        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
572        let entity_id = rand::rng().random_range(0..100);
573        for i in 0..5 {
574            let record = ServerRecord::Spc(SpcRecord {
575                created_at: Utc::now() + chrono::Duration::hours(i),
576                feature: "feature1".to_string(),
577                value: i as f64,
578                entity_id,
579                uid: format!("entity_uid_{entity_id}"),
580            });
581
582            batch.push(record);
583        }
584
585        for i in 0..5 {
586            let record = ServerRecord::Spc(SpcRecord {
587                created_at: Utc::now() + chrono::Duration::hours(i),
588                feature: "feature2".to_string(),
589                value: i as f64,
590                entity_id,
591                uid: format!("entity_uid_{entity_id}"),
592            });
593
594            batch.push(record);
595        }
596
597        let records = ServerRecords::new(batch);
598        let rpath = "spc";
599        df.write_parquet(rpath, records.clone()).await.unwrap();
600
601        // get canonical path
602        let canonical_path = df.storage_root();
603        let data_path = object_store::path::Path::from(canonical_path);
604
605        // Check if the file exists
606        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
607        assert_eq!(files.len(), 5);
608
609        // attempt to read the file
610        let read_df = df
611            .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
612            .await
613            .unwrap();
614
615        let _spc_drift = dataframe_to_spc_drift_features(read_df).await.unwrap();
616
617        //// delete the file
618        for file in files.iter() {
619            let path = Path::from(file.to_string());
620            df.storage_client()
621                .delete(&path)
622                .await
623                .expect("Failed to delete file");
624        }
625        //
626        //// Check if the file is deleted
627        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
628        assert_eq!(files.len(), 0);
629
630        // cleanup
631        cleanup();
632    }
633}