scouter_dataframe/parquet/
dataframe.rs

1use crate::error::DataFrameError;
2use crate::parquet::custom::CustomMetricDataFrame;
3use crate::parquet::llm::{LLMDriftDataFrame, LLMMetricDataFrame};
4use crate::parquet::psi::PsiDataFrame;
5use crate::parquet::spc::SpcDataFrame;
6use crate::parquet::traits::ParquetFrame;
7use crate::storage::ObjectStore;
8use chrono::{DateTime, Utc};
9use datafusion::prelude::DataFrame;
10use scouter_settings::ObjectStorageSettings;
11use scouter_types::{RecordType, ServerRecords, StorageType};
12use tracing::instrument;
13
14pub enum ParquetDataFrame {
15    CustomMetric(CustomMetricDataFrame),
16    Psi(PsiDataFrame),
17    Spc(SpcDataFrame),
18    LLMMetric(LLMMetricDataFrame),
19    LLMDrift(LLMDriftDataFrame),
20}
21
22impl ParquetDataFrame {
23    pub fn new(
24        storage_settings: &ObjectStorageSettings,
25        record_type: &RecordType,
26    ) -> Result<Self, DataFrameError> {
27        match record_type {
28            RecordType::Custom => Ok(ParquetDataFrame::CustomMetric(CustomMetricDataFrame::new(
29                storage_settings,
30            )?)),
31            RecordType::Psi => Ok(ParquetDataFrame::Psi(PsiDataFrame::new(storage_settings)?)),
32            RecordType::Spc => Ok(ParquetDataFrame::Spc(SpcDataFrame::new(storage_settings)?)),
33            RecordType::LLMMetric => Ok(ParquetDataFrame::LLMMetric(LLMMetricDataFrame::new(
34                storage_settings,
35            )?)),
36            RecordType::LLMDrift => Ok(ParquetDataFrame::LLMDrift(LLMDriftDataFrame::new(
37                storage_settings,
38            )?)),
39
40            _ => Err(DataFrameError::InvalidRecordTypeError(
41                record_type.to_string(),
42            )),
43        }
44    }
45
46    /// Write the records to a parquet file at the given path.
47    ///
48    /// # Arguments
49    ///
50    /// * `rpath` - The path to write the parquet file to. (This path should exclude root path)
51    /// * `records` - The records to write to the parquet file.
52    ///
53    #[instrument(skip_all, err)]
54    pub async fn write_parquet(
55        &self,
56        rpath: &str,
57        records: ServerRecords,
58    ) -> Result<(), DataFrameError> {
59        let rpath = &self.resolve_path(rpath);
60
61        match self {
62            ParquetDataFrame::CustomMetric(df) => df.write_parquet(rpath, records).await,
63            ParquetDataFrame::Psi(df) => df.write_parquet(rpath, records).await,
64            ParquetDataFrame::Spc(df) => df.write_parquet(rpath, records).await,
65            ParquetDataFrame::LLMMetric(df) => df.write_parquet(rpath, records).await,
66            ParquetDataFrame::LLMDrift(df) => df.write_parquet(rpath, records).await,
67        }
68    }
69
70    pub fn storage_root(&self) -> String {
71        match self {
72            ParquetDataFrame::CustomMetric(df) => df.storage_root(),
73            ParquetDataFrame::Psi(df) => df.storage_root(),
74            ParquetDataFrame::Spc(df) => df.storage_root(),
75            ParquetDataFrame::LLMMetric(df) => df.storage_root(),
76            ParquetDataFrame::LLMDrift(df) => df.storage_root(),
77        }
78    }
79
80    /// primarily used for dev
81    pub fn storage_client(&self) -> ObjectStore {
82        match self {
83            ParquetDataFrame::CustomMetric(df) => df.object_store.clone(),
84            ParquetDataFrame::Psi(df) => df.object_store.clone(),
85            ParquetDataFrame::Spc(df) => df.object_store.clone(),
86            ParquetDataFrame::LLMMetric(df) => df.object_store.clone(),
87            ParquetDataFrame::LLMDrift(df) => df.object_store.clone(),
88        }
89    }
90
91    /// Get binned metrics from archived parquet files
92    ///
93    /// # Arguments
94    /// * path - The path to the parquet files (directory). This will be read as a table listing
95    /// * bin - The bin size
96    /// * start_time - The start time of the query
97    /// * end_time - The end time of the query
98    /// * space - The space to query
99    /// * name - The name to query
100    /// * version - The version to query
101    #[allow(clippy::too_many_arguments)]
102    pub async fn get_binned_metrics(
103        &self,
104        path: &str,
105        bin: &f64,
106        start_time: &DateTime<Utc>,
107        end_time: &DateTime<Utc>,
108        space: &str,
109        name: &str,
110        version: &str,
111    ) -> Result<DataFrame, DataFrameError> {
112        let read_path = &self.resolve_path(path);
113
114        match self {
115            ParquetDataFrame::CustomMetric(df) => {
116                df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
117                    .await
118            }
119            ParquetDataFrame::Psi(df) => {
120                df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
121                    .await
122            }
123            ParquetDataFrame::Spc(df) => {
124                df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
125                    .await
126            }
127
128            ParquetDataFrame::LLMMetric(df) => {
129                df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
130                    .await
131            }
132            ParquetDataFrame::LLMDrift(_) => Err(DataFrameError::UnsupportedOperation(
133                "LLMDrift does not support binned metrics".to_string(),
134            )),
135        }
136    }
137
138    /// Get the underyling object store storage type
139    pub fn storage_type(&self) -> StorageType {
140        match self {
141            ParquetDataFrame::CustomMetric(df) => {
142                df.object_store.storage_settings.storage_type.clone()
143            }
144            ParquetDataFrame::Psi(df) => df.object_store.storage_settings.storage_type.clone(),
145            ParquetDataFrame::Spc(df) => df.object_store.storage_settings.storage_type.clone(),
146            ParquetDataFrame::LLMMetric(df) => {
147                df.object_store.storage_settings.storage_type.clone()
148            }
149            ParquetDataFrame::LLMDrift(df) => df.object_store.storage_settings.storage_type.clone(),
150        }
151    }
152
153    pub fn resolve_path(&self, path: &str) -> String {
154        format!("{}/{}/", self.storage_root(), path)
155    }
156}
157
158#[cfg(test)]
159mod tests {
160
161    use super::*;
162    use crate::parquet::psi::dataframe_to_psi_drift_features;
163    use crate::parquet::spc::dataframe_to_spc_drift_features;
164    use crate::parquet::utils::BinnedMetricsExtractor;
165    use chrono::Utc;
166    use object_store::path::Path;
167    use potato_head::create_score_prompt;
168    use rand::Rng;
169    use scouter_settings::ObjectStorageSettings;
170    use scouter_types::{
171        BoxedLLMDriftServerRecord, CustomMetricServerRecord, LLMDriftServerRecord, LLMMetricRecord,
172        PsiServerRecord, ServerRecord, ServerRecords, SpcServerRecord, Status,
173    };
174    use serde_json::Map;
175    use serde_json::Value;
176
177    fn cleanup() {
178        let storage_settings = ObjectStorageSettings::default();
179        let current_dir = std::env::current_dir().unwrap();
180        let storage_path = current_dir.join(storage_settings.storage_root());
181        if storage_path.exists() {
182            std::fs::remove_dir_all(storage_path).unwrap();
183        }
184    }
185
186    #[tokio::test]
187    async fn test_write_llm_drift_record_dataframe_local() {
188        cleanup();
189        let storage_settings = ObjectStorageSettings::default();
190        let df = ParquetDataFrame::new(&storage_settings, &RecordType::LLMDrift).unwrap();
191        let mut batch = Vec::new();
192
193        let prompt = create_score_prompt(None);
194
195        // create records
196        for i in 0..3 {
197            for _ in 0..50 {
198                let record = LLMDriftServerRecord {
199                    created_at: Utc::now() + chrono::Duration::hours(i),
200                    space: "test".to_string(),
201                    name: "test".to_string(),
202                    version: "1.0".to_string(),
203                    prompt: Some(prompt.model_dump_value()),
204                    context: serde_json::Value::Object(Map::new()),
205                    score: Value::Null,
206                    status: Status::Pending,
207                    id: 0,
208                    uid: "test-uid".to_string(),
209                    updated_at: None,
210                    processing_started_at: None,
211                    processing_ended_at: None,
212                    processing_duration: None,
213                };
214
215                let boxed_record = BoxedLLMDriftServerRecord::new(record);
216                batch.push(ServerRecord::LLMDrift(boxed_record));
217            }
218        }
219
220        let records = ServerRecords::new(batch);
221        let rpath = "llm_drift";
222        df.write_parquet(rpath, records.clone()).await.unwrap();
223
224        // get canonical path
225        let canonical_path = df.storage_root();
226        let data_path = object_store::path::Path::from(canonical_path);
227
228        // Check if the file exists
229        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
230        assert_eq!(files.len(), 3);
231
232        //// delete the file
233        for file in files.iter() {
234            let path = Path::from(file.to_string());
235            df.storage_client()
236                .delete(&path)
237                .await
238                .expect("Failed to delete file");
239        }
240        //
241        //// Check if the file is deleted
242        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
243        assert_eq!(files.len(), 0);
244
245        // cleanup
246        cleanup();
247    }
248
249    #[tokio::test]
250    async fn test_write_llm_drift_metric_dataframe_local() {
251        cleanup();
252        let storage_settings = ObjectStorageSettings::default();
253        let df = ParquetDataFrame::new(&storage_settings, &RecordType::LLMMetric).unwrap();
254        let mut batch = Vec::new();
255        let start_utc = Utc::now();
256        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
257
258        // create records
259        for i in 0..3 {
260            for j in 0..50 {
261                let record = ServerRecord::LLMMetric(LLMMetricRecord {
262                    record_uid: format!("record_uid_{i}_{j}"),
263                    created_at: Utc::now() + chrono::Duration::hours(i),
264                    name: "test".to_string(),
265                    space: "test".to_string(),
266                    version: "1.0".to_string(),
267                    metric: format!("metric{i}"),
268                    value: j as f64,
269                });
270
271                batch.push(record);
272            }
273        }
274
275        let records = ServerRecords::new(batch);
276        let rpath = "llm_metric";
277        df.write_parquet(rpath, records.clone()).await.unwrap();
278
279        // get canonical path
280        let canonical_path = df.storage_root();
281        let data_path = object_store::path::Path::from(canonical_path);
282
283        // Check if the file exists
284        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
285        assert_eq!(files.len(), 3);
286
287        // attempt to read the file
288        let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::LLMMetric).unwrap();
289
290        let read_df = new_df
291            .get_binned_metrics(
292                rpath,
293                &0.01,
294                &start_utc,
295                &end_utc_for_test,
296                "test",
297                "test",
298                "1.0",
299            )
300            .await
301            .unwrap();
302
303        //read_df.show().await.unwrap();
304
305        let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
306            .await
307            .unwrap();
308
309        assert_eq!(binned_metrics.metrics.len(), 3);
310
311        //// delete the file
312        for file in files.iter() {
313            let path = Path::from(file.to_string());
314            df.storage_client()
315                .delete(&path)
316                .await
317                .expect("Failed to delete file");
318        }
319        //
320        //// Check if the file is deleted
321        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
322        assert_eq!(files.len(), 0);
323
324        // cleanup
325        cleanup();
326    }
327
328    #[tokio::test]
329    async fn test_write_custom_dataframe_local() {
330        cleanup();
331        let storage_settings = ObjectStorageSettings::default();
332        let df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
333        let mut batch = Vec::new();
334        let start_utc = Utc::now();
335        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
336
337        // create records
338        for i in 0..3 {
339            for j in 0..50 {
340                let record = ServerRecord::Custom(CustomMetricServerRecord {
341                    created_at: Utc::now() + chrono::Duration::hours(i),
342                    name: "test".to_string(),
343                    space: "test".to_string(),
344                    version: "1.0".to_string(),
345                    metric: format!("metric{i}"),
346                    value: j as f64,
347                });
348
349                batch.push(record);
350            }
351        }
352
353        let records = ServerRecords::new(batch);
354        let rpath = "custom";
355        df.write_parquet(rpath, records.clone()).await.unwrap();
356
357        // get canonical path
358        let canonical_path = df.storage_root();
359        let data_path = object_store::path::Path::from(canonical_path);
360
361        // Check if the file exists
362        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
363        assert_eq!(files.len(), 3);
364
365        // attempt to read the file
366        let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
367
368        let read_df = new_df
369            .get_binned_metrics(
370                rpath,
371                &0.01,
372                &start_utc,
373                &end_utc_for_test,
374                "test",
375                "test",
376                "1.0",
377            )
378            .await
379            .unwrap();
380
381        //read_df.show().await.unwrap();
382
383        let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
384            .await
385            .unwrap();
386
387        assert_eq!(binned_metrics.metrics.len(), 3);
388
389        //// delete the file
390        for file in files.iter() {
391            let path = Path::from(file.to_string());
392            df.storage_client()
393                .delete(&path)
394                .await
395                .expect("Failed to delete file");
396        }
397        //
398        //// Check if the file is deleted
399        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
400        assert_eq!(files.len(), 0);
401
402        // cleanup
403        cleanup();
404    }
405
406    #[tokio::test]
407    async fn test_write_psi_dataframe_local() {
408        cleanup();
409
410        let storage_settings = ObjectStorageSettings::default();
411        let df = ParquetDataFrame::new(&storage_settings, &RecordType::Psi).unwrap();
412        let mut batch = Vec::new();
413        let start_utc = Utc::now();
414        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
415
416        for i in 0..3 {
417            for j in 0..5 {
418                let record = ServerRecord::Psi(PsiServerRecord {
419                    created_at: Utc::now() + chrono::Duration::hours(i),
420                    name: "test".to_string(),
421                    space: "test".to_string(),
422                    version: "1.0".to_string(),
423                    feature: "feature1".to_string(),
424                    bin_id: j as usize,
425                    bin_count: rand::rng().random_range(0..100),
426                });
427
428                batch.push(record);
429            }
430        }
431
432        for i in 0..3 {
433            for j in 0..5 {
434                let record = ServerRecord::Psi(PsiServerRecord {
435                    created_at: Utc::now() + chrono::Duration::hours(i),
436                    name: "test".to_string(),
437                    space: "test".to_string(),
438                    version: "1.0".to_string(),
439                    feature: "feature2".to_string(),
440                    bin_id: j as usize,
441                    bin_count: rand::rng().random_range(0..100),
442                });
443
444                batch.push(record);
445            }
446        }
447
448        let records = ServerRecords::new(batch);
449        let rpath = "psi";
450        df.write_parquet(rpath, records.clone()).await.unwrap();
451
452        // get canonical path
453        let canonical_path = df.storage_root();
454        let data_path = object_store::path::Path::from(canonical_path);
455
456        // Check if the file exists
457        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
458        assert_eq!(files.len(), 3);
459
460        // attempt to read the file
461        let read_df = df
462            .get_binned_metrics(
463                rpath,
464                &0.01,
465                &start_utc,
466                &end_utc_for_test,
467                "test",
468                "test",
469                "1.0",
470            )
471            .await
472            .unwrap();
473
474        let psi_drift = dataframe_to_psi_drift_features(read_df).await.unwrap();
475        assert_eq!(psi_drift.len(), 2);
476
477        //// delete the file
478        for file in files.iter() {
479            let path = Path::from(file.to_string());
480            df.storage_client()
481                .delete(&path)
482                .await
483                .expect("Failed to delete file");
484        }
485        //
486        //// Check if the file is deleted
487        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
488        assert_eq!(files.len(), 0);
489
490        // cleanup
491        cleanup();
492    }
493
494    #[tokio::test]
495    async fn test_write_spc_dataframe_local() {
496        cleanup();
497        let storage_settings = ObjectStorageSettings::default();
498        let df = ParquetDataFrame::new(&storage_settings, &RecordType::Spc).unwrap();
499        let mut batch = Vec::new();
500        let start_utc = Utc::now();
501        let end_utc_for_test = start_utc + chrono::Duration::hours(3);
502
503        for i in 0..5 {
504            let record = ServerRecord::Spc(SpcServerRecord {
505                created_at: Utc::now() + chrono::Duration::hours(i),
506                name: "test".to_string(),
507                space: "test".to_string(),
508                version: "1.0".to_string(),
509                feature: "feature1".to_string(),
510                value: i as f64,
511            });
512
513            batch.push(record);
514        }
515
516        for i in 0..5 {
517            let record = ServerRecord::Spc(SpcServerRecord {
518                created_at: Utc::now() + chrono::Duration::hours(i),
519                name: "test".to_string(),
520                space: "test".to_string(),
521                version: "1.0".to_string(),
522                feature: "feature2".to_string(),
523                value: i as f64,
524            });
525
526            batch.push(record);
527        }
528
529        let records = ServerRecords::new(batch);
530        let rpath = "spc";
531        df.write_parquet(rpath, records.clone()).await.unwrap();
532
533        // get canonical path
534        let canonical_path = df.storage_root();
535        let data_path = object_store::path::Path::from(canonical_path);
536
537        // Check if the file exists
538        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
539        assert_eq!(files.len(), 5);
540
541        // attempt to read the file
542        let read_df = df
543            .get_binned_metrics(
544                rpath,
545                &0.01,
546                &start_utc,
547                &end_utc_for_test,
548                "test",
549                "test",
550                "1.0",
551            )
552            .await
553            .unwrap();
554
555        let _spc_drift = dataframe_to_spc_drift_features(read_df).await.unwrap();
556
557        //// delete the file
558        for file in files.iter() {
559            let path = Path::from(file.to_string());
560            df.storage_client()
561                .delete(&path)
562                .await
563                .expect("Failed to delete file");
564        }
565        //
566        //// Check if the file is deleted
567        let files = df.storage_client().list(Some(&data_path)).await.unwrap();
568        assert_eq!(files.len(), 0);
569
570        // cleanup
571        cleanup();
572    }
573}