1use crate::error::DataFrameError;
2use crate::parquet::custom::CustomMetricDataFrame;
3use crate::parquet::llm::{LLMDriftDataFrame, LLMMetricDataFrame};
4use crate::parquet::psi::PsiDataFrame;
5use crate::parquet::spc::SpcDataFrame;
6use crate::parquet::traits::ParquetFrame;
7use crate::storage::ObjectStore;
8use chrono::{DateTime, Utc};
9use datafusion::prelude::DataFrame;
10use scouter_settings::ObjectStorageSettings;
11use scouter_types::{RecordType, ServerRecords, StorageType};
12use tracing::instrument;
13
14pub enum ParquetDataFrame {
15 CustomMetric(CustomMetricDataFrame),
16 Psi(PsiDataFrame),
17 Spc(SpcDataFrame),
18 LLMMetric(LLMMetricDataFrame),
19 LLMDrift(LLMDriftDataFrame),
20}
21
22impl ParquetDataFrame {
23 pub fn new(
24 storage_settings: &ObjectStorageSettings,
25 record_type: &RecordType,
26 ) -> Result<Self, DataFrameError> {
27 match record_type {
28 RecordType::Custom => Ok(ParquetDataFrame::CustomMetric(CustomMetricDataFrame::new(
29 storage_settings,
30 )?)),
31 RecordType::Psi => Ok(ParquetDataFrame::Psi(PsiDataFrame::new(storage_settings)?)),
32 RecordType::Spc => Ok(ParquetDataFrame::Spc(SpcDataFrame::new(storage_settings)?)),
33 RecordType::LLMMetric => Ok(ParquetDataFrame::LLMMetric(LLMMetricDataFrame::new(
34 storage_settings,
35 )?)),
36 RecordType::LLMDrift => Ok(ParquetDataFrame::LLMDrift(LLMDriftDataFrame::new(
37 storage_settings,
38 )?)),
39
40 _ => Err(DataFrameError::InvalidRecordTypeError(
41 record_type.to_string(),
42 )),
43 }
44 }
45
46 #[instrument(skip_all, err)]
54 pub async fn write_parquet(
55 &self,
56 rpath: &str,
57 records: ServerRecords,
58 ) -> Result<(), DataFrameError> {
59 let rpath = &self.resolve_path(rpath);
60
61 match self {
62 ParquetDataFrame::CustomMetric(df) => df.write_parquet(rpath, records).await,
63 ParquetDataFrame::Psi(df) => df.write_parquet(rpath, records).await,
64 ParquetDataFrame::Spc(df) => df.write_parquet(rpath, records).await,
65 ParquetDataFrame::LLMMetric(df) => df.write_parquet(rpath, records).await,
66 ParquetDataFrame::LLMDrift(df) => df.write_parquet(rpath, records).await,
67 }
68 }
69
70 pub fn storage_root(&self) -> String {
71 match self {
72 ParquetDataFrame::CustomMetric(df) => df.storage_root(),
73 ParquetDataFrame::Psi(df) => df.storage_root(),
74 ParquetDataFrame::Spc(df) => df.storage_root(),
75 ParquetDataFrame::LLMMetric(df) => df.storage_root(),
76 ParquetDataFrame::LLMDrift(df) => df.storage_root(),
77 }
78 }
79
80 pub fn storage_client(&self) -> ObjectStore {
82 match self {
83 ParquetDataFrame::CustomMetric(df) => df.object_store.clone(),
84 ParquetDataFrame::Psi(df) => df.object_store.clone(),
85 ParquetDataFrame::Spc(df) => df.object_store.clone(),
86 ParquetDataFrame::LLMMetric(df) => df.object_store.clone(),
87 ParquetDataFrame::LLMDrift(df) => df.object_store.clone(),
88 }
89 }
90
91 #[allow(clippy::too_many_arguments)]
102 pub async fn get_binned_metrics(
103 &self,
104 path: &str,
105 bin: &f64,
106 start_time: &DateTime<Utc>,
107 end_time: &DateTime<Utc>,
108 space: &str,
109 name: &str,
110 version: &str,
111 ) -> Result<DataFrame, DataFrameError> {
112 let read_path = &self.resolve_path(path);
113
114 match self {
115 ParquetDataFrame::CustomMetric(df) => {
116 df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
117 .await
118 }
119 ParquetDataFrame::Psi(df) => {
120 df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
121 .await
122 }
123 ParquetDataFrame::Spc(df) => {
124 df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
125 .await
126 }
127
128 ParquetDataFrame::LLMMetric(df) => {
129 df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
130 .await
131 }
132 ParquetDataFrame::LLMDrift(_) => Err(DataFrameError::UnsupportedOperation(
133 "LLMDrift does not support binned metrics".to_string(),
134 )),
135 }
136 }
137
138 pub fn storage_type(&self) -> StorageType {
140 match self {
141 ParquetDataFrame::CustomMetric(df) => {
142 df.object_store.storage_settings.storage_type.clone()
143 }
144 ParquetDataFrame::Psi(df) => df.object_store.storage_settings.storage_type.clone(),
145 ParquetDataFrame::Spc(df) => df.object_store.storage_settings.storage_type.clone(),
146 ParquetDataFrame::LLMMetric(df) => {
147 df.object_store.storage_settings.storage_type.clone()
148 }
149 ParquetDataFrame::LLMDrift(df) => df.object_store.storage_settings.storage_type.clone(),
150 }
151 }
152
153 pub fn resolve_path(&self, path: &str) -> String {
154 format!("{}/{}/", self.storage_root(), path)
155 }
156}
157
158#[cfg(test)]
159mod tests {
160
161 use super::*;
162 use crate::parquet::psi::dataframe_to_psi_drift_features;
163 use crate::parquet::spc::dataframe_to_spc_drift_features;
164 use crate::parquet::utils::BinnedMetricsExtractor;
165 use chrono::Utc;
166 use object_store::path::Path;
167 use potato_head::create_score_prompt;
168 use rand::Rng;
169 use scouter_settings::ObjectStorageSettings;
170 use scouter_types::{
171 BoxedLLMDriftServerRecord, CustomMetricServerRecord, LLMDriftServerRecord, LLMMetricRecord,
172 PsiServerRecord, ServerRecord, ServerRecords, SpcServerRecord, Status,
173 };
174 use serde_json::Map;
175 use serde_json::Value;
176
177 fn cleanup() {
178 let storage_settings = ObjectStorageSettings::default();
179 let current_dir = std::env::current_dir().unwrap();
180 let storage_path = current_dir.join(storage_settings.storage_root());
181 if storage_path.exists() {
182 std::fs::remove_dir_all(storage_path).unwrap();
183 }
184 }
185
186 #[tokio::test]
187 async fn test_write_llm_drift_record_dataframe_local() {
188 cleanup();
189 let storage_settings = ObjectStorageSettings::default();
190 let df = ParquetDataFrame::new(&storage_settings, &RecordType::LLMDrift).unwrap();
191 let mut batch = Vec::new();
192
193 let prompt = create_score_prompt(None);
194
195 for i in 0..3 {
197 for _ in 0..50 {
198 let record = LLMDriftServerRecord {
199 created_at: Utc::now() + chrono::Duration::hours(i),
200 space: "test".to_string(),
201 name: "test".to_string(),
202 version: "1.0".to_string(),
203 prompt: Some(prompt.model_dump_value()),
204 context: serde_json::Value::Object(Map::new()),
205 score: Value::Null,
206 status: Status::Pending,
207 id: 0,
208 uid: "test-uid".to_string(),
209 updated_at: None,
210 processing_started_at: None,
211 processing_ended_at: None,
212 processing_duration: None,
213 };
214
215 let boxed_record = BoxedLLMDriftServerRecord::new(record);
216 batch.push(ServerRecord::LLMDrift(boxed_record));
217 }
218 }
219
220 let records = ServerRecords::new(batch);
221 let rpath = "llm_drift";
222 df.write_parquet(rpath, records.clone()).await.unwrap();
223
224 let canonical_path = df.storage_root();
226 let data_path = object_store::path::Path::from(canonical_path);
227
228 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
230 assert_eq!(files.len(), 3);
231
232 for file in files.iter() {
234 let path = Path::from(file.to_string());
235 df.storage_client()
236 .delete(&path)
237 .await
238 .expect("Failed to delete file");
239 }
240 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
243 assert_eq!(files.len(), 0);
244
245 cleanup();
247 }
248
249 #[tokio::test]
250 async fn test_write_llm_drift_metric_dataframe_local() {
251 cleanup();
252 let storage_settings = ObjectStorageSettings::default();
253 let df = ParquetDataFrame::new(&storage_settings, &RecordType::LLMMetric).unwrap();
254 let mut batch = Vec::new();
255 let start_utc = Utc::now();
256 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
257
258 for i in 0..3 {
260 for j in 0..50 {
261 let record = ServerRecord::LLMMetric(LLMMetricRecord {
262 record_uid: format!("record_uid_{i}_{j}"),
263 created_at: Utc::now() + chrono::Duration::hours(i),
264 name: "test".to_string(),
265 space: "test".to_string(),
266 version: "1.0".to_string(),
267 metric: format!("metric{i}"),
268 value: j as f64,
269 });
270
271 batch.push(record);
272 }
273 }
274
275 let records = ServerRecords::new(batch);
276 let rpath = "llm_metric";
277 df.write_parquet(rpath, records.clone()).await.unwrap();
278
279 let canonical_path = df.storage_root();
281 let data_path = object_store::path::Path::from(canonical_path);
282
283 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
285 assert_eq!(files.len(), 3);
286
287 let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::LLMMetric).unwrap();
289
290 let read_df = new_df
291 .get_binned_metrics(
292 rpath,
293 &0.01,
294 &start_utc,
295 &end_utc_for_test,
296 "test",
297 "test",
298 "1.0",
299 )
300 .await
301 .unwrap();
302
303 let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
306 .await
307 .unwrap();
308
309 assert_eq!(binned_metrics.metrics.len(), 3);
310
311 for file in files.iter() {
313 let path = Path::from(file.to_string());
314 df.storage_client()
315 .delete(&path)
316 .await
317 .expect("Failed to delete file");
318 }
319 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
322 assert_eq!(files.len(), 0);
323
324 cleanup();
326 }
327
328 #[tokio::test]
329 async fn test_write_custom_dataframe_local() {
330 cleanup();
331 let storage_settings = ObjectStorageSettings::default();
332 let df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
333 let mut batch = Vec::new();
334 let start_utc = Utc::now();
335 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
336
337 for i in 0..3 {
339 for j in 0..50 {
340 let record = ServerRecord::Custom(CustomMetricServerRecord {
341 created_at: Utc::now() + chrono::Duration::hours(i),
342 name: "test".to_string(),
343 space: "test".to_string(),
344 version: "1.0".to_string(),
345 metric: format!("metric{i}"),
346 value: j as f64,
347 });
348
349 batch.push(record);
350 }
351 }
352
353 let records = ServerRecords::new(batch);
354 let rpath = "custom";
355 df.write_parquet(rpath, records.clone()).await.unwrap();
356
357 let canonical_path = df.storage_root();
359 let data_path = object_store::path::Path::from(canonical_path);
360
361 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
363 assert_eq!(files.len(), 3);
364
365 let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
367
368 let read_df = new_df
369 .get_binned_metrics(
370 rpath,
371 &0.01,
372 &start_utc,
373 &end_utc_for_test,
374 "test",
375 "test",
376 "1.0",
377 )
378 .await
379 .unwrap();
380
381 let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
384 .await
385 .unwrap();
386
387 assert_eq!(binned_metrics.metrics.len(), 3);
388
389 for file in files.iter() {
391 let path = Path::from(file.to_string());
392 df.storage_client()
393 .delete(&path)
394 .await
395 .expect("Failed to delete file");
396 }
397 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
400 assert_eq!(files.len(), 0);
401
402 cleanup();
404 }
405
406 #[tokio::test]
407 async fn test_write_psi_dataframe_local() {
408 cleanup();
409
410 let storage_settings = ObjectStorageSettings::default();
411 let df = ParquetDataFrame::new(&storage_settings, &RecordType::Psi).unwrap();
412 let mut batch = Vec::new();
413 let start_utc = Utc::now();
414 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
415
416 for i in 0..3 {
417 for j in 0..5 {
418 let record = ServerRecord::Psi(PsiServerRecord {
419 created_at: Utc::now() + chrono::Duration::hours(i),
420 name: "test".to_string(),
421 space: "test".to_string(),
422 version: "1.0".to_string(),
423 feature: "feature1".to_string(),
424 bin_id: j as usize,
425 bin_count: rand::rng().random_range(0..100),
426 });
427
428 batch.push(record);
429 }
430 }
431
432 for i in 0..3 {
433 for j in 0..5 {
434 let record = ServerRecord::Psi(PsiServerRecord {
435 created_at: Utc::now() + chrono::Duration::hours(i),
436 name: "test".to_string(),
437 space: "test".to_string(),
438 version: "1.0".to_string(),
439 feature: "feature2".to_string(),
440 bin_id: j as usize,
441 bin_count: rand::rng().random_range(0..100),
442 });
443
444 batch.push(record);
445 }
446 }
447
448 let records = ServerRecords::new(batch);
449 let rpath = "psi";
450 df.write_parquet(rpath, records.clone()).await.unwrap();
451
452 let canonical_path = df.storage_root();
454 let data_path = object_store::path::Path::from(canonical_path);
455
456 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
458 assert_eq!(files.len(), 3);
459
460 let read_df = df
462 .get_binned_metrics(
463 rpath,
464 &0.01,
465 &start_utc,
466 &end_utc_for_test,
467 "test",
468 "test",
469 "1.0",
470 )
471 .await
472 .unwrap();
473
474 let psi_drift = dataframe_to_psi_drift_features(read_df).await.unwrap();
475 assert_eq!(psi_drift.len(), 2);
476
477 for file in files.iter() {
479 let path = Path::from(file.to_string());
480 df.storage_client()
481 .delete(&path)
482 .await
483 .expect("Failed to delete file");
484 }
485 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
488 assert_eq!(files.len(), 0);
489
490 cleanup();
492 }
493
494 #[tokio::test]
495 async fn test_write_spc_dataframe_local() {
496 cleanup();
497 let storage_settings = ObjectStorageSettings::default();
498 let df = ParquetDataFrame::new(&storage_settings, &RecordType::Spc).unwrap();
499 let mut batch = Vec::new();
500 let start_utc = Utc::now();
501 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
502
503 for i in 0..5 {
504 let record = ServerRecord::Spc(SpcServerRecord {
505 created_at: Utc::now() + chrono::Duration::hours(i),
506 name: "test".to_string(),
507 space: "test".to_string(),
508 version: "1.0".to_string(),
509 feature: "feature1".to_string(),
510 value: i as f64,
511 });
512
513 batch.push(record);
514 }
515
516 for i in 0..5 {
517 let record = ServerRecord::Spc(SpcServerRecord {
518 created_at: Utc::now() + chrono::Duration::hours(i),
519 name: "test".to_string(),
520 space: "test".to_string(),
521 version: "1.0".to_string(),
522 feature: "feature2".to_string(),
523 value: i as f64,
524 });
525
526 batch.push(record);
527 }
528
529 let records = ServerRecords::new(batch);
530 let rpath = "spc";
531 df.write_parquet(rpath, records.clone()).await.unwrap();
532
533 let canonical_path = df.storage_root();
535 let data_path = object_store::path::Path::from(canonical_path);
536
537 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
539 assert_eq!(files.len(), 5);
540
541 let read_df = df
543 .get_binned_metrics(
544 rpath,
545 &0.01,
546 &start_utc,
547 &end_utc_for_test,
548 "test",
549 "test",
550 "1.0",
551 )
552 .await
553 .unwrap();
554
555 let _spc_drift = dataframe_to_spc_drift_features(read_df).await.unwrap();
556
557 for file in files.iter() {
559 let path = Path::from(file.to_string());
560 df.storage_client()
561 .delete(&path)
562 .await
563 .expect("Failed to delete file");
564 }
565 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
568 assert_eq!(files.len(), 0);
569
570 cleanup();
572 }
573}