Skip to main content

scouter_dataframe/parquet/
dataframe.rs

1use crate::error::DataFrameError;
2use crate::parquet::custom::CustomMetricDataFrame;
3use crate::parquet::genai::{GenAIEvalDataFrame, GenAITaskDataFrame, GenAIWorkflowDataFrame};
4use crate::parquet::psi::PsiDataFrame;
5use crate::parquet::spc::SpcDataFrame;
6use crate::parquet::traits::ParquetFrame;
7use crate::storage::ObjectStore;
8use chrono::{DateTime, Utc};
9use datafusion::prelude::DataFrame;
10use scouter_settings::ObjectStorageSettings;
11use scouter_types::{RecordType, ServerRecords, StorageType};
12use tracing::instrument;
13
14pub enum ParquetDataFrame {
15    CustomMetric(CustomMetricDataFrame),
16    Psi(PsiDataFrame),
17    Spc(SpcDataFrame),
18    GenAITask(GenAITaskDataFrame),
19    GenAIWorkflow(GenAIWorkflowDataFrame),
20    GenAIEval(GenAIEvalDataFrame),
21}
22
23impl ParquetDataFrame {
24    pub fn new(
25        storage_settings: &ObjectStorageSettings,
26        record_type: &RecordType,
27    ) -> Result<Self, DataFrameError> {
28        match record_type {
29            RecordType::Custom => Ok(ParquetDataFrame::CustomMetric(CustomMetricDataFrame::new(
30                storage_settings,
31            )?)),
32            RecordType::Psi => Ok(ParquetDataFrame::Psi(PsiDataFrame::new(storage_settings)?)),
33            RecordType::Spc => Ok(ParquetDataFrame::Spc(SpcDataFrame::new(storage_settings)?)),
34            RecordType::GenAITask => Ok(ParquetDataFrame::GenAITask(GenAITaskDataFrame::new(
35                storage_settings,
36            )?)),
37            RecordType::GenAIWorkflow => Ok(ParquetDataFrame::GenAIWorkflow(
38                GenAIWorkflowDataFrame::new(storage_settings)?,
39            )),
40            RecordType::GenAIEval => Ok(ParquetDataFrame::GenAIEval(GenAIEvalDataFrame::new(
41                storage_settings,
42            )?)),
43
44            _ => Err(DataFrameError::InvalidRecordTypeError(
45                record_type.to_string(),
46            )),
47        }
48    }
49
50    /// Write the records to a parquet file at the given path.
51    ///
52    /// # Arguments
53    ///
54    /// * `rpath` - The path to write the parquet file to. (This path should exclude root path)
55    /// * `records` - The records to write to the parquet file.
56    ///
57    #[instrument(skip_all, err)]
58    pub async fn write_parquet(
59        &self,
60        rpath: &str,
61        records: ServerRecords,
62    ) -> Result<(), DataFrameError> {
63        let rpath = &self.resolve_path(rpath);
64
65        match self {
66            ParquetDataFrame::CustomMetric(df) => df.write_parquet(rpath, records).await,
67            ParquetDataFrame::Psi(df) => df.write_parquet(rpath, records).await,
68            ParquetDataFrame::Spc(df) => df.write_parquet(rpath, records).await,
69            ParquetDataFrame::GenAITask(df) => df.write_parquet(rpath, records).await,
70            ParquetDataFrame::GenAIWorkflow(df) => df.write_parquet(rpath, records).await,
71            ParquetDataFrame::GenAIEval(df) => df.write_parquet(rpath, records).await,
72        }
73    }
74
75    pub fn storage_root(&self) -> String {
76        match self {
77            ParquetDataFrame::CustomMetric(df) => df.storage_root(),
78            ParquetDataFrame::Psi(df) => df.storage_root(),
79            ParquetDataFrame::Spc(df) => df.storage_root(),
80            ParquetDataFrame::GenAITask(df) => df.storage_root(),
81            ParquetDataFrame::GenAIWorkflow(df) => df.storage_root(),
82            ParquetDataFrame::GenAIEval(df) => df.storage_root(),
83        }
84    }
85
86    /// primarily used for dev
87    pub fn storage_client(&self) -> ObjectStore {
88        match self {
89            ParquetDataFrame::CustomMetric(df) => df.object_store.clone(),
90            ParquetDataFrame::Psi(df) => df.object_store.clone(),
91            ParquetDataFrame::Spc(df) => df.object_store.clone(),
92            ParquetDataFrame::GenAITask(df) => df.object_store.clone(),
93            ParquetDataFrame::GenAIWorkflow(df) => df.object_store.clone(),
94            ParquetDataFrame::GenAIEval(df) => df.object_store.clone(),
95        }
96    }
97
98    /// Get binned metrics from archived parquet files
99    ///
100    /// # Arguments
101    /// * path - The path to the parquet files (directory). This will be read as a table listing
102    /// * bin - The bin size
103    /// * start_time - The start time of the query
104    /// * end_time - The end time of the query
105    /// * space - The space to query
106    /// * name - The name to query
107    /// * version - The version to query
108    #[allow(clippy::too_many_arguments)]
109    pub async fn get_binned_metrics(
110        &self,
111        path: &str,
112        bin: &f64,
113        start_time: &DateTime<Utc>,
114        end_time: &DateTime<Utc>,
115        entity_id: &i32,
116    ) -> Result<DataFrame, DataFrameError> {
117        let read_path = &self.resolve_path(path);
118
119        match self {
120            ParquetDataFrame::CustomMetric(df) => {
121                df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
122                    .await
123            }
124            ParquetDataFrame::Psi(df) => {
125                df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
126                    .await
127            }
128            ParquetDataFrame::Spc(df) => {
129                df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
130                    .await
131            }
132
133            ParquetDataFrame::GenAITask(df) => {
134                df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
135                    .await
136            }
137            ParquetDataFrame::GenAIWorkflow(df) => {
138                df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
139                    .await
140            }
141            ParquetDataFrame::GenAIEval(_) => Err(DataFrameError::UnsupportedOperation(
142                "GenAI drift does not support binned metrics".to_string(),
143            )),
144        }
145    }
146
147    /// Get the underyling object store storage type
148    pub fn storage_type(&self) -> StorageType {
149        match self {
150            ParquetDataFrame::CustomMetric(df) => {
151                df.object_store.storage_settings.storage_type.clone()
152            }
153            ParquetDataFrame::Psi(df) => df.object_store.storage_settings.storage_type.clone(),
154            ParquetDataFrame::Spc(df) => df.object_store.storage_settings.storage_type.clone(),
155            ParquetDataFrame::GenAITask(df) => {
156                df.object_store.storage_settings.storage_type.clone()
157            }
158            ParquetDataFrame::GenAIWorkflow(df) => {
159                df.object_store.storage_settings.storage_type.clone()
160            }
161            ParquetDataFrame::GenAIEval(df) => {
162                df.object_store.storage_settings.storage_type.clone()
163            }
164        }
165    }
166
167    pub fn resolve_path(&self, path: &str) -> String {
168        format!("{}/{}/", self.storage_root(), path)
169    }
170}
171
172#[cfg(test)]
173mod tests {
174
175    use super::*;
176    use crate::parquet::psi::dataframe_to_psi_drift_features;
177    use crate::parquet::spc::dataframe_to_spc_drift_features;
178    use crate::parquet::types::BinnedTableName;
179    use crate::parquet::utils::BinnedMetricsExtractor;
180    use chrono::Utc;
181    use object_store::path::Path;
182    use rand::Rng;
183    use scouter_settings::ObjectStorageSettings;
184    use scouter_types::Assertion;
185    use scouter_types::{
186        BoxedGenAIEvalRecord, GenAIEvalRecord, PsiRecord, ServerRecord, ServerRecords, SpcRecord,
187        Status,
188    };
189    use scouter_types::{CustomMetricRecord, GenAIEvalTaskResult, GenAIEvalWorkflowResult};
190    use serde_json::Map;
191    use serde_json::Value;
192
193    fn cleanup() {
194        let storage_settings = ObjectStorageSettings::default();
195        let current_dir = std::env::current_dir().unwrap();
196        let storage_path = current_dir.join(storage_settings.storage_root());
197        if storage_path.exists() {
198            std::fs::remove_dir_all(storage_path).unwrap();
199        }
200    }
201
202    #[tokio::test]
203    async fn test_write_genai_event_record_dataframe_local() {
204        cleanup();
205        let storage_settings = ObjectStorageSettings::default();
206        let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIEval).unwrap();
207        let mut batch = Vec::new();
208        let entity_id = rand::rng().random_range(0..100);
209
210        // create records
211        for i in 0..3 {
212            for j in 0..50 {
213                let record = GenAIEvalRecord {
214                    created_at: Utc::now() + chrono::Duration::hours(i),
215                    entity_id,
216                    context: serde_json::Value::Object(Map::new()),
217                    status: Status::Pending,
218                    id: 0,
219                    uid: format!("record_uid_{i}_{j}"),
220                    entity_uid: format!("entity_uid_{entity_id}"),
221                    ..Default::default()
222                };
223
224                let boxed_record = BoxedGenAIEvalRecord::new(record);
225                batch.push(ServerRecord::GenAIEval(boxed_record));
226            }
227        }
228
229        let records = ServerRecords::new(batch);
230        let rpath = BinnedTableName::GenAIEval.to_string();
231        df.write_parquet(&rpath, records.clone()).await.unwrap();
232
233        // get canonical path
234        let canonical_path = df.storage_root();
235        let data_path = object_store::path::Path::from(canonical_path);
236
237        // Check if the file exists
238        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
239        assert_eq!(files.len(), 3);
240
241        //// delete the file
242        for file in files.iter() {
243            let path = Path::from(file.to_string());
244            df.storage_client()
245                .delete(&path)
246                .await
247                .expect("Failed to delete file");
248        }
249        //
250        //// Check if the file is deleted
251        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
252        assert_eq!(files.len(), 0);
253
254        // cleanup
255        cleanup();
256    }
257
258    #[tokio::test]
259    async fn test_write_genai_task_dataframe_local() {
260        cleanup();
261        let storage_settings = ObjectStorageSettings::default();
262        let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAITask).unwrap();
263        let mut batch = Vec::new();
264        let start_utc = Utc::now();
265        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
266        let entity_id = rand::rng().random_range(0..100);
267
268        // create records
269        for i in 0..3 {
270            for j in 0..50 {
271                let record = ServerRecord::GenAITaskRecord(GenAIEvalTaskResult {
272                    record_uid: format!("record_uid_{i}_{j}"),
273                    created_at: Utc::now() + chrono::Duration::hours(i),
274                    start_time: Utc::now() + chrono::Duration::hours(i),
275                    end_time: Utc::now()
276                        + chrono::Duration::hours(i)
277                        + chrono::Duration::minutes(5),
278                    entity_id,
279                    task_id: format!("task{i}"),
280                    task_type: scouter_types::genai::EvaluationTaskType::Assertion,
281                    passed: true,
282                    value: j as f64,
283                    assertion: Assertion::FieldPath(Some(format!("field.path.{i}"))),
284                    operator: scouter_types::genai::ComparisonOperator::Contains,
285                    expected: Value::Null,
286                    actual: Value::Null,
287                    message: "All good".to_string(),
288                    entity_uid: format!("entity_uid_{entity_id}"),
289                    condition: false,
290                    stage: 0,
291                });
292
293                batch.push(record);
294            }
295        }
296
297        let records = ServerRecords::new(batch);
298        let rpath = BinnedTableName::GenAITask.to_string();
299        df.write_parquet(&rpath, records.clone()).await.unwrap();
300
301        // get canonical path
302        let canonical_path = df.storage_root();
303        let data_path = object_store::path::Path::from(canonical_path);
304
305        // Check if the file exists
306        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
307        assert_eq!(files.len(), 3);
308
309        // attempt to read the file
310        let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAITask).unwrap();
311
312        let read_df = new_df
313            .get_binned_metrics(&rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
314            .await
315            .unwrap();
316
317        //read_df.show().await.unwrap();
318
319        let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
320            .await
321            .unwrap();
322
323        assert_eq!(binned_metrics.metrics.len(), 3);
324
325        //// delete the file
326        for file in files.iter() {
327            let path = Path::from(file.to_string());
328            df.storage_client()
329                .delete(&path)
330                .await
331                .expect("Failed to delete file");
332        }
333        //
334        //// Check if the file is deleted
335        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
336        assert_eq!(files.len(), 0);
337
338        // cleanup
339        cleanup();
340    }
341
342    #[tokio::test]
343    async fn test_write_genai_workflow_dataframe_local() {
344        cleanup();
345        let storage_settings = ObjectStorageSettings::default();
346        let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIWorkflow).unwrap();
347        let mut batch = Vec::new();
348        let start_utc = Utc::now();
349        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
350        let entity_id = rand::rng().random_range(0..100);
351
352        // create records
353        for i in 0..3 {
354            for j in 0..50 {
355                let record = ServerRecord::GenAIWorkflowRecord(GenAIEvalWorkflowResult {
356                    record_uid: format!("record_uid_{i}_{j}"),
357                    created_at: Utc::now() + chrono::Duration::hours(i),
358                    entity_id,
359                    total_tasks: 10,
360                    passed_tasks: 8,
361                    failed_tasks: 2,
362                    pass_rate: 0.8,
363                    duration_ms: 1500,
364                    entity_uid: format!("entity_uid_{entity_id}"),
365                    execution_plan: scouter_types::genai::ExecutionPlan::default(),
366                    id: j,
367                });
368
369                batch.push(record);
370            }
371        }
372
373        let records = ServerRecords::new(batch);
374        let rpath = BinnedTableName::GenAIWorkflow.to_string();
375        df.write_parquet(&rpath, records.clone()).await.unwrap();
376
377        // get canonical path
378        let canonical_path = df.storage_root();
379        let data_path = object_store::path::Path::from(canonical_path);
380
381        // Check if the file exists
382        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
383        assert_eq!(files.len(), 3);
384
385        // attempt to read the file
386        let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIWorkflow).unwrap();
387
388        let read_df = new_df
389            .get_binned_metrics(&rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
390            .await
391            .unwrap();
392
393        let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
394            .await
395            .unwrap();
396
397        // should be workflow metric
398        assert_eq!(binned_metrics.metrics.len(), 1);
399
400        let workflow_metrics = &binned_metrics.metrics["workflow"];
401        assert_eq!(workflow_metrics.created_at.len(), 3);
402        assert_eq!(workflow_metrics.stats.len(), 3);
403
404        //// delete the file
405        for file in files.iter() {
406            let path = Path::from(file.to_string());
407            df.storage_client()
408                .delete(&path)
409                .await
410                .expect("Failed to delete file");
411        }
412        //
413        //// Check if the file is deleted
414        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
415        assert_eq!(files.len(), 0);
416
417        // cleanup
418        cleanup();
419    }
420
421    #[tokio::test]
422    async fn test_write_custom_dataframe_local() {
423        cleanup();
424        let storage_settings = ObjectStorageSettings::default();
425        let df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
426        let mut batch = Vec::new();
427        let start_utc = Utc::now();
428        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
429        let entity_id = rand::rng().random_range(0..100);
430        // create records
431        for i in 0..3 {
432            for j in 0..50 {
433                let record = ServerRecord::Custom(CustomMetricRecord {
434                    created_at: Utc::now() + chrono::Duration::hours(i),
435                    metric: format!("metric{i}"),
436                    value: j as f64,
437                    entity_id,
438                    uid: format!("entity_uid_{entity_id}"),
439                });
440
441                batch.push(record);
442            }
443        }
444
445        let records = ServerRecords::new(batch);
446        let rpath = "custom";
447        df.write_parquet(rpath, records.clone()).await.unwrap();
448
449        // get canonical path
450        let canonical_path = df.storage_root();
451        let data_path = object_store::path::Path::from(canonical_path);
452
453        // Check if the file exists
454        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
455        assert_eq!(files.len(), 3);
456
457        // attempt to read the file
458        let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
459
460        let read_df = new_df
461            .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
462            .await
463            .unwrap();
464
465        //read_df.show().await.unwrap();
466
467        let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
468            .await
469            .unwrap();
470
471        assert_eq!(binned_metrics.metrics.len(), 3);
472
473        //// delete the file
474        for file in files.iter() {
475            let path = Path::from(file.to_string());
476            df.storage_client()
477                .delete(&path)
478                .await
479                .expect("Failed to delete file");
480        }
481        //
482        //// Check if the file is deleted
483        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
484        assert_eq!(files.len(), 0);
485
486        // cleanup
487        cleanup();
488    }
489
490    #[tokio::test]
491    async fn test_write_psi_dataframe_local() {
492        cleanup();
493
494        let storage_settings = ObjectStorageSettings::default();
495        let df = ParquetDataFrame::new(&storage_settings, &RecordType::Psi).unwrap();
496        let mut batch = Vec::new();
497        let start_utc = Utc::now();
498        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
499        let entity_id = rand::rng().random_range(0..100);
500        for i in 0..3 {
501            for j in 0..5 {
502                let record = ServerRecord::Psi(PsiRecord {
503                    created_at: Utc::now() + chrono::Duration::hours(i),
504                    feature: "feature1".to_string(),
505                    bin_id: j,
506                    bin_count: rand::rng().random_range(0..100),
507                    entity_id,
508                    uid: format!("entity_uid_{entity_id}"),
509                });
510
511                batch.push(record);
512            }
513        }
514
515        for i in 0..3 {
516            for j in 0..5 {
517                let record = ServerRecord::Psi(PsiRecord {
518                    created_at: Utc::now() + chrono::Duration::hours(i),
519                    feature: "feature2".to_string(),
520                    bin_id: j,
521                    bin_count: rand::rng().random_range(0..100),
522                    entity_id,
523                    uid: format!("entity_uid_{entity_id}"),
524                });
525
526                batch.push(record);
527            }
528        }
529
530        let records = ServerRecords::new(batch);
531        let rpath = "psi";
532        df.write_parquet(rpath, records.clone()).await.unwrap();
533
534        // get canonical path
535        let canonical_path = df.storage_root();
536        let data_path = object_store::path::Path::from(canonical_path);
537
538        // Check if the file exists
539        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
540        assert_eq!(files.len(), 3);
541
542        // attempt to read the file
543        let read_df = df
544            .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
545            .await
546            .unwrap();
547
548        let psi_drift = dataframe_to_psi_drift_features(read_df).await.unwrap();
549        assert_eq!(psi_drift.len(), 2);
550
551        //// delete the file
552        for file in files.iter() {
553            let path = Path::from(file.to_string());
554            df.storage_client()
555                .delete(&path)
556                .await
557                .expect("Failed to delete file");
558        }
559        //
560        //// Check if the file is deleted
561        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
562        assert_eq!(files.len(), 0);
563
564        // cleanup
565        cleanup();
566    }
567
568    #[tokio::test]
569    async fn test_write_spc_dataframe_local() {
570        cleanup();
571        let storage_settings = ObjectStorageSettings::default();
572        let df = ParquetDataFrame::new(&storage_settings, &RecordType::Spc).unwrap();
573        let mut batch = Vec::new();
574        let start_utc = Utc::now();
575        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
576        let entity_id = rand::rng().random_range(0..100);
577        for i in 0..5 {
578            let record = ServerRecord::Spc(SpcRecord {
579                created_at: Utc::now() + chrono::Duration::hours(i),
580                feature: "feature1".to_string(),
581                value: i as f64,
582                entity_id,
583                uid: format!("entity_uid_{entity_id}"),
584            });
585
586            batch.push(record);
587        }
588
589        for i in 0..5 {
590            let record = ServerRecord::Spc(SpcRecord {
591                created_at: Utc::now() + chrono::Duration::hours(i),
592                feature: "feature2".to_string(),
593                value: i as f64,
594                entity_id,
595                uid: format!("entity_uid_{entity_id}"),
596            });
597
598            batch.push(record);
599        }
600
601        let records = ServerRecords::new(batch);
602        let rpath = "spc";
603        df.write_parquet(rpath, records.clone()).await.unwrap();
604
605        // get canonical path
606        let canonical_path = df.storage_root();
607        let data_path = object_store::path::Path::from(canonical_path);
608
609        // Check if the file exists
610        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
611        assert_eq!(files.len(), 5);
612
613        // attempt to read the file
614        let read_df = df
615            .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
616            .await
617            .unwrap();
618
619        let _spc_drift = dataframe_to_spc_drift_features(read_df).await.unwrap();
620
621        //// delete the file
622        for file in files.iter() {
623            let path = Path::from(file.to_string());
624            df.storage_client()
625                .delete(&path)
626                .await
627                .expect("Failed to delete file");
628        }
629        //
630        //// Check if the file is deleted
631        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
632        assert_eq!(files.len(), 0);
633
634        // cleanup
635        cleanup();
636    }
637}