Skip to main content

scouter_dataframe/parquet/
dataframe.rs

1use crate::error::DataFrameError;
2use crate::parquet::custom::CustomMetricDataFrame;
3use crate::parquet::genai::{GenAIEvalDataFrame, GenAITaskDataFrame, GenAIWorkflowDataFrame};
4use crate::parquet::psi::PsiDataFrame;
5use crate::parquet::spc::SpcDataFrame;
6use crate::parquet::traits::ParquetFrame;
7use crate::storage::ObjectStore;
8use chrono::{DateTime, Utc};
9use datafusion::prelude::DataFrame;
10use scouter_settings::ObjectStorageSettings;
11use scouter_types::{RecordType, ServerRecords, StorageType};
12use tracing::instrument;
13
14pub enum ParquetDataFrame {
15    CustomMetric(CustomMetricDataFrame),
16    Psi(PsiDataFrame),
17    Spc(SpcDataFrame),
18    GenAITask(GenAITaskDataFrame),
19    GenAIWorkflow(GenAIWorkflowDataFrame),
20    GenAIEval(GenAIEvalDataFrame),
21}
22
23impl ParquetDataFrame {
24    pub fn new(
25        storage_settings: &ObjectStorageSettings,
26        record_type: &RecordType,
27    ) -> Result<Self, DataFrameError> {
28        match record_type {
29            RecordType::Custom => Ok(ParquetDataFrame::CustomMetric(CustomMetricDataFrame::new(
30                storage_settings,
31            )?)),
32            RecordType::Psi => Ok(ParquetDataFrame::Psi(PsiDataFrame::new(storage_settings)?)),
33            RecordType::Spc => Ok(ParquetDataFrame::Spc(SpcDataFrame::new(storage_settings)?)),
34            RecordType::GenAITask => Ok(ParquetDataFrame::GenAITask(GenAITaskDataFrame::new(
35                storage_settings,
36            )?)),
37            RecordType::GenAIWorkflow => Ok(ParquetDataFrame::GenAIWorkflow(
38                GenAIWorkflowDataFrame::new(storage_settings)?,
39            )),
40            RecordType::GenAIEval => Ok(ParquetDataFrame::GenAIEval(GenAIEvalDataFrame::new(
41                storage_settings,
42            )?)),
43
44            _ => Err(DataFrameError::InvalidRecordTypeError(
45                record_type.to_string(),
46            )),
47        }
48    }
49
50    /// Write the records to a parquet file at the given path.
51    ///
52    /// # Arguments
53    ///
54    /// * `rpath` - The path to write the parquet file to. (This path should exclude root path)
55    /// * `records` - The records to write to the parquet file.
56    ///
57    #[instrument(skip_all, err)]
58    pub async fn write_parquet(
59        &self,
60        rpath: &str,
61        records: ServerRecords,
62    ) -> Result<(), DataFrameError> {
63        let rpath = &self.resolve_path(rpath);
64
65        match self {
66            ParquetDataFrame::CustomMetric(df) => df.write_parquet(rpath, records).await,
67            ParquetDataFrame::Psi(df) => df.write_parquet(rpath, records).await,
68            ParquetDataFrame::Spc(df) => df.write_parquet(rpath, records).await,
69            ParquetDataFrame::GenAITask(df) => df.write_parquet(rpath, records).await,
70            ParquetDataFrame::GenAIWorkflow(df) => df.write_parquet(rpath, records).await,
71            ParquetDataFrame::GenAIEval(df) => df.write_parquet(rpath, records).await,
72        }
73    }
74
75    pub fn storage_root(&self) -> String {
76        match self {
77            ParquetDataFrame::CustomMetric(df) => df.storage_root(),
78            ParquetDataFrame::Psi(df) => df.storage_root(),
79            ParquetDataFrame::Spc(df) => df.storage_root(),
80            ParquetDataFrame::GenAITask(df) => df.storage_root(),
81            ParquetDataFrame::GenAIWorkflow(df) => df.storage_root(),
82            ParquetDataFrame::GenAIEval(df) => df.storage_root(),
83        }
84    }
85
86    /// primarily used for dev
87    pub fn storage_client(&self) -> ObjectStore {
88        match self {
89            ParquetDataFrame::CustomMetric(df) => df.object_store.clone(),
90            ParquetDataFrame::Psi(df) => df.object_store.clone(),
91            ParquetDataFrame::Spc(df) => df.object_store.clone(),
92            ParquetDataFrame::GenAITask(df) => df.object_store.clone(),
93            ParquetDataFrame::GenAIWorkflow(df) => df.object_store.clone(),
94            ParquetDataFrame::GenAIEval(df) => df.object_store.clone(),
95        }
96    }
97
98    /// Get binned metrics from archived parquet files
99    ///
100    /// # Arguments
101    /// * path - The path to the parquet files (directory). This will be read as a table listing
102    /// * bin - The bin size
103    /// * start_time - The start time of the query
104    /// * end_time - The end time of the query
105    /// * space - The space to query
106    /// * name - The name to query
107    /// * version - The version to query
108    #[allow(clippy::too_many_arguments)]
109    pub async fn get_binned_metrics(
110        &self,
111        path: &str,
112        bin: &f64,
113        start_time: &DateTime<Utc>,
114        end_time: &DateTime<Utc>,
115        entity_id: &i32,
116    ) -> Result<DataFrame, DataFrameError> {
117        let read_path = &self.resolve_path(path);
118
119        match self {
120            ParquetDataFrame::CustomMetric(df) => {
121                df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
122                    .await
123            }
124            ParquetDataFrame::Psi(df) => {
125                df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
126                    .await
127            }
128            ParquetDataFrame::Spc(df) => {
129                df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
130                    .await
131            }
132
133            ParquetDataFrame::GenAITask(df) => {
134                df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
135                    .await
136            }
137            ParquetDataFrame::GenAIWorkflow(df) => {
138                df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
139                    .await
140            }
141            ParquetDataFrame::GenAIEval(_) => Err(DataFrameError::UnsupportedOperation(
142                "GenAI drift does not support binned metrics".to_string(),
143            )),
144        }
145    }
146
147    /// Get the underyling object store storage type
148    pub fn storage_type(&self) -> StorageType {
149        match self {
150            ParquetDataFrame::CustomMetric(df) => {
151                df.object_store.storage_settings.storage_type.clone()
152            }
153            ParquetDataFrame::Psi(df) => df.object_store.storage_settings.storage_type.clone(),
154            ParquetDataFrame::Spc(df) => df.object_store.storage_settings.storage_type.clone(),
155            ParquetDataFrame::GenAITask(df) => {
156                df.object_store.storage_settings.storage_type.clone()
157            }
158            ParquetDataFrame::GenAIWorkflow(df) => {
159                df.object_store.storage_settings.storage_type.clone()
160            }
161            ParquetDataFrame::GenAIEval(df) => {
162                df.object_store.storage_settings.storage_type.clone()
163            }
164        }
165    }
166
167    pub fn resolve_path(&self, path: &str) -> String {
168        format!("{}/{}/", self.storage_root(), path)
169    }
170}
171
172#[cfg(test)]
173mod tests {
174
175    use super::*;
176    use crate::parquet::psi::dataframe_to_psi_drift_features;
177    use crate::parquet::spc::dataframe_to_spc_drift_features;
178    use crate::parquet::types::BinnedTableName;
179    use crate::parquet::utils::BinnedMetricsExtractor;
180    use chrono::Utc;
181    use object_store::path::Path;
182    use rand::Rng;
183    use scouter_settings::ObjectStorageSettings;
184    use scouter_types::Assertion;
185    use scouter_types::{
186        BoxedGenAIEvalRecord, GenAIEvalRecord, PsiRecord, ServerRecord, ServerRecords, SpcRecord,
187        Status,
188    };
189    use scouter_types::{CustomMetricRecord, GenAIEvalTaskResult, GenAIEvalWorkflowResult};
190    use serde_json::Map;
191    use serde_json::Value;
192
193    fn cleanup() {
194        let storage_settings = ObjectStorageSettings::default();
195        let current_dir = std::env::current_dir().unwrap();
196        let storage_path = current_dir.join(storage_settings.storage_root());
197        if storage_path.exists() {
198            std::fs::remove_dir_all(storage_path).unwrap();
199        }
200    }
201
202    #[tokio::test]
203    async fn test_write_genai_event_record_dataframe_local() {
204        cleanup();
205        let storage_settings = ObjectStorageSettings::default();
206        let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIEval).unwrap();
207        let mut batch = Vec::new();
208        let entity_id = rand::rng().random_range(0..100);
209
210        // create records
211        for i in 0..3 {
212            for j in 0..50 {
213                let record = GenAIEvalRecord {
214                    created_at: Utc::now() + chrono::Duration::hours(i),
215                    entity_id,
216                    context: serde_json::Value::Object(Map::new()),
217                    status: Status::Pending,
218                    id: 0,
219                    uid: format!("record_uid_{i}_{j}"),
220                    entity_uid: format!("entity_uid_{entity_id}"),
221                    ..Default::default()
222                };
223
224                let boxed_record = BoxedGenAIEvalRecord::new(record);
225                batch.push(ServerRecord::GenAIEval(boxed_record));
226            }
227        }
228
229        let records = ServerRecords::new(batch);
230        let rpath = BinnedTableName::GenAIEval.to_string();
231        df.write_parquet(&rpath, records.clone()).await.unwrap();
232
233        // get canonical path
234        let canonical_path = df.storage_root();
235        let data_path = object_store::path::Path::from(canonical_path);
236
237        // Check if the file exists
238        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
239        assert_eq!(files.len(), 3);
240
241        //// delete the file
242        for file in files.iter() {
243            let path = Path::from(file.to_string());
244            df.storage_client()
245                .delete(&path)
246                .await
247                .expect("Failed to delete file");
248        }
249        //
250        //// Check if the file is deleted
251        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
252        assert_eq!(files.len(), 0);
253
254        // cleanup
255        cleanup();
256    }
257
258    #[tokio::test]
259    async fn test_write_genai_task_dataframe_local() {
260        cleanup();
261        let storage_settings = ObjectStorageSettings::default();
262        let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAITask).unwrap();
263        let mut batch = Vec::new();
264        let start_utc = Utc::now();
265        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
266        let entity_id = rand::rng().random_range(0..100);
267
268        // create records
269        for i in 0..3 {
270            for j in 0..50 {
271                let record = ServerRecord::GenAITaskRecord(GenAIEvalTaskResult {
272                    record_uid: format!("record_uid_{i}_{j}"),
273                    created_at: Utc::now() + chrono::Duration::hours(i),
274                    start_time: Utc::now() + chrono::Duration::hours(i),
275                    end_time: Utc::now()
276                        + chrono::Duration::hours(i)
277                        + chrono::Duration::minutes(5),
278                    entity_id,
279                    task_id: format!("task{i}"),
280                    task_type: scouter_types::genai::EvaluationTaskType::Assertion,
281                    passed: true,
282                    value: j as f64,
283                    assertion: Assertion::FieldPath(Some(format!("field.path.{i}"))),
284                    operator: scouter_types::genai::ComparisonOperator::Contains,
285                    expected: Value::Null,
286                    actual: Value::Null,
287                    message: "All good".to_string(),
288                    entity_uid: format!("entity_uid_{entity_id}"),
289                    condition: false,
290                    stage: 0,
291                });
292
293                batch.push(record);
294            }
295        }
296
297        let records = ServerRecords::new(batch);
298        let rpath = BinnedTableName::GenAITask.to_string();
299        df.write_parquet(&rpath, records.clone()).await.unwrap();
300
301        // get canonical path
302        let canonical_path = df.storage_root();
303        let data_path = object_store::path::Path::from(canonical_path);
304
305        // Check if the file exists
306        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
307        assert_eq!(files.len(), 3);
308
309        // attempt to read the file
310        let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAITask).unwrap();
311
312        let read_df = new_df
313            .get_binned_metrics(&rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
314            .await
315            .unwrap();
316
317        //read_df.show().await.unwrap();
318
319        let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
320            .await
321            .unwrap();
322
323        assert_eq!(binned_metrics.metrics.len(), 3);
324
325        //// delete the file
326        for file in files.iter() {
327            let path = Path::from(file.to_string());
328            df.storage_client()
329                .delete(&path)
330                .await
331                .expect("Failed to delete file");
332        }
333        //
334        //// Check if the file is deleted
335        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
336        assert_eq!(files.len(), 0);
337
338        // cleanup
339        cleanup();
340    }
341
342    #[tokio::test]
343    async fn test_write_genai_workflow_dataframe_local() {
344        cleanup();
345        let storage_settings = ObjectStorageSettings::default();
346        let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIWorkflow).unwrap();
347        let mut batch = Vec::new();
348        let start_utc = Utc::now();
349        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
350        let entity_id = rand::rng().random_range(0..100);
351
352        // create records
353        for i in 0..3 {
354            for j in 0..50 {
355                let record = ServerRecord::GenAIWorkflowRecord(GenAIEvalWorkflowResult {
356                    record_uid: format!("record_uid_{i}_{j}"),
357                    created_at: Utc::now() + chrono::Duration::hours(i),
358                    entity_id,
359                    total_tasks: 10,
360                    passed_tasks: 8,
361                    failed_tasks: 2,
362                    pass_rate: 0.8,
363                    duration_ms: 1500,
364                    entity_uid: format!("entity_uid_{entity_id}"),
365                    execution_plan: scouter_types::genai::ExecutionPlan::default(),
366                    id: j,
367                });
368
369                batch.push(record);
370            }
371        }
372
373        let records = ServerRecords::new(batch);
374        let rpath = BinnedTableName::GenAIWorkflow.to_string();
375        df.write_parquet(&rpath, records.clone()).await.unwrap();
376
377        // get canonical path
378        let canonical_path = df.storage_root();
379        let data_path = object_store::path::Path::from(canonical_path);
380
381        // Check if the file exists
382        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
383        assert_eq!(files.len(), 3);
384
385        // attempt to read the file
386        let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIWorkflow).unwrap();
387
388        let read_df = new_df
389            .get_binned_metrics(&rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
390            .await
391            .unwrap();
392
393        //read_df.show().await.unwrap();
394
395        let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
396            .await
397            .unwrap();
398
399        assert_eq!(binned_metrics.metrics.len(), 3);
400
401        //// delete the file
402        for file in files.iter() {
403            let path = Path::from(file.to_string());
404            df.storage_client()
405                .delete(&path)
406                .await
407                .expect("Failed to delete file");
408        }
409        //
410        //// Check if the file is deleted
411        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
412        assert_eq!(files.len(), 0);
413
414        // cleanup
415        cleanup();
416    }
417
418    #[tokio::test]
419    async fn test_write_custom_dataframe_local() {
420        cleanup();
421        let storage_settings = ObjectStorageSettings::default();
422        let df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
423        let mut batch = Vec::new();
424        let start_utc = Utc::now();
425        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
426        let entity_id = rand::rng().random_range(0..100);
427        // create records
428        for i in 0..3 {
429            for j in 0..50 {
430                let record = ServerRecord::Custom(CustomMetricRecord {
431                    created_at: Utc::now() + chrono::Duration::hours(i),
432                    metric: format!("metric{i}"),
433                    value: j as f64,
434                    entity_id,
435                    uid: format!("entity_uid_{entity_id}"),
436                });
437
438                batch.push(record);
439            }
440        }
441
442        let records = ServerRecords::new(batch);
443        let rpath = "custom";
444        df.write_parquet(rpath, records.clone()).await.unwrap();
445
446        // get canonical path
447        let canonical_path = df.storage_root();
448        let data_path = object_store::path::Path::from(canonical_path);
449
450        // Check if the file exists
451        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
452        assert_eq!(files.len(), 3);
453
454        // attempt to read the file
455        let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
456
457        let read_df = new_df
458            .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
459            .await
460            .unwrap();
461
462        //read_df.show().await.unwrap();
463
464        let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
465            .await
466            .unwrap();
467
468        assert_eq!(binned_metrics.metrics.len(), 3);
469
470        //// delete the file
471        for file in files.iter() {
472            let path = Path::from(file.to_string());
473            df.storage_client()
474                .delete(&path)
475                .await
476                .expect("Failed to delete file");
477        }
478        //
479        //// Check if the file is deleted
480        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
481        assert_eq!(files.len(), 0);
482
483        // cleanup
484        cleanup();
485    }
486
487    #[tokio::test]
488    async fn test_write_psi_dataframe_local() {
489        cleanup();
490
491        let storage_settings = ObjectStorageSettings::default();
492        let df = ParquetDataFrame::new(&storage_settings, &RecordType::Psi).unwrap();
493        let mut batch = Vec::new();
494        let start_utc = Utc::now();
495        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
496        let entity_id = rand::rng().random_range(0..100);
497        for i in 0..3 {
498            for j in 0..5 {
499                let record = ServerRecord::Psi(PsiRecord {
500                    created_at: Utc::now() + chrono::Duration::hours(i),
501                    feature: "feature1".to_string(),
502                    bin_id: j,
503                    bin_count: rand::rng().random_range(0..100),
504                    entity_id,
505                    uid: format!("entity_uid_{entity_id}"),
506                });
507
508                batch.push(record);
509            }
510        }
511
512        for i in 0..3 {
513            for j in 0..5 {
514                let record = ServerRecord::Psi(PsiRecord {
515                    created_at: Utc::now() + chrono::Duration::hours(i),
516                    feature: "feature2".to_string(),
517                    bin_id: j,
518                    bin_count: rand::rng().random_range(0..100),
519                    entity_id,
520                    uid: format!("entity_uid_{entity_id}"),
521                });
522
523                batch.push(record);
524            }
525        }
526
527        let records = ServerRecords::new(batch);
528        let rpath = "psi";
529        df.write_parquet(rpath, records.clone()).await.unwrap();
530
531        // get canonical path
532        let canonical_path = df.storage_root();
533        let data_path = object_store::path::Path::from(canonical_path);
534
535        // Check if the file exists
536        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
537        assert_eq!(files.len(), 3);
538
539        // attempt to read the file
540        let read_df = df
541            .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
542            .await
543            .unwrap();
544
545        let psi_drift = dataframe_to_psi_drift_features(read_df).await.unwrap();
546        assert_eq!(psi_drift.len(), 2);
547
548        //// delete the file
549        for file in files.iter() {
550            let path = Path::from(file.to_string());
551            df.storage_client()
552                .delete(&path)
553                .await
554                .expect("Failed to delete file");
555        }
556        //
557        //// Check if the file is deleted
558        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
559        assert_eq!(files.len(), 0);
560
561        // cleanup
562        cleanup();
563    }
564
565    #[tokio::test]
566    async fn test_write_spc_dataframe_local() {
567        cleanup();
568        let storage_settings = ObjectStorageSettings::default();
569        let df = ParquetDataFrame::new(&storage_settings, &RecordType::Spc).unwrap();
570        let mut batch = Vec::new();
571        let start_utc = Utc::now();
572        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
573        let entity_id = rand::rng().random_range(0..100);
574        for i in 0..5 {
575            let record = ServerRecord::Spc(SpcRecord {
576                created_at: Utc::now() + chrono::Duration::hours(i),
577                feature: "feature1".to_string(),
578                value: i as f64,
579                entity_id,
580                uid: format!("entity_uid_{entity_id}"),
581            });
582
583            batch.push(record);
584        }
585
586        for i in 0..5 {
587            let record = ServerRecord::Spc(SpcRecord {
588                created_at: Utc::now() + chrono::Duration::hours(i),
589                feature: "feature2".to_string(),
590                value: i as f64,
591                entity_id,
592                uid: format!("entity_uid_{entity_id}"),
593            });
594
595            batch.push(record);
596        }
597
598        let records = ServerRecords::new(batch);
599        let rpath = "spc";
600        df.write_parquet(rpath, records.clone()).await.unwrap();
601
602        // get canonical path
603        let canonical_path = df.storage_root();
604        let data_path = object_store::path::Path::from(canonical_path);
605
606        // Check if the file exists
607        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
608        assert_eq!(files.len(), 5);
609
610        // attempt to read the file
611        let read_df = df
612            .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
613            .await
614            .unwrap();
615
616        let _spc_drift = dataframe_to_spc_drift_features(read_df).await.unwrap();
617
618        //// delete the file
619        for file in files.iter() {
620            let path = Path::from(file.to_string());
621            df.storage_client()
622                .delete(&path)
623                .await
624                .expect("Failed to delete file");
625        }
626        //
627        //// Check if the file is deleted
628        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
629        assert_eq!(files.len(), 0);
630
631        // cleanup
632        cleanup();
633    }
634}