1use crate::error::DataFrameError;
2use crate::parquet::custom::CustomMetricDataFrame;
3use crate::parquet::genai::{GenAIEvalDataFrame, GenAITaskDataFrame, GenAIWorkflowDataFrame};
4use crate::parquet::psi::PsiDataFrame;
5use crate::parquet::spc::SpcDataFrame;
6use crate::parquet::traits::ParquetFrame;
7use crate::storage::ObjectStore;
8use chrono::{DateTime, Utc};
9use datafusion::prelude::DataFrame;
10use scouter_settings::ObjectStorageSettings;
11use scouter_types::{RecordType, ServerRecords, StorageType};
12use tracing::instrument;
13
14pub enum ParquetDataFrame {
15 CustomMetric(CustomMetricDataFrame),
16 Psi(PsiDataFrame),
17 Spc(SpcDataFrame),
18 GenAITask(GenAITaskDataFrame),
19 GenAIWorkflow(GenAIWorkflowDataFrame),
20 GenAIEval(GenAIEvalDataFrame),
21}
22
23impl ParquetDataFrame {
24 pub fn new(
25 storage_settings: &ObjectStorageSettings,
26 record_type: &RecordType,
27 ) -> Result<Self, DataFrameError> {
28 match record_type {
29 RecordType::Custom => Ok(ParquetDataFrame::CustomMetric(CustomMetricDataFrame::new(
30 storage_settings,
31 )?)),
32 RecordType::Psi => Ok(ParquetDataFrame::Psi(PsiDataFrame::new(storage_settings)?)),
33 RecordType::Spc => Ok(ParquetDataFrame::Spc(SpcDataFrame::new(storage_settings)?)),
34 RecordType::GenAITask => Ok(ParquetDataFrame::GenAITask(GenAITaskDataFrame::new(
35 storage_settings,
36 )?)),
37 RecordType::GenAIWorkflow => Ok(ParquetDataFrame::GenAIWorkflow(
38 GenAIWorkflowDataFrame::new(storage_settings)?,
39 )),
40 RecordType::GenAIEval => Ok(ParquetDataFrame::GenAIEval(GenAIEvalDataFrame::new(
41 storage_settings,
42 )?)),
43
44 _ => Err(DataFrameError::InvalidRecordTypeError(
45 record_type.to_string(),
46 )),
47 }
48 }
49
50 #[instrument(skip_all, err)]
58 pub async fn write_parquet(
59 &self,
60 rpath: &str,
61 records: ServerRecords,
62 ) -> Result<(), DataFrameError> {
63 let rpath = &self.resolve_path(rpath);
64
65 match self {
66 ParquetDataFrame::CustomMetric(df) => df.write_parquet(rpath, records).await,
67 ParquetDataFrame::Psi(df) => df.write_parquet(rpath, records).await,
68 ParquetDataFrame::Spc(df) => df.write_parquet(rpath, records).await,
69 ParquetDataFrame::GenAITask(df) => df.write_parquet(rpath, records).await,
70 ParquetDataFrame::GenAIWorkflow(df) => df.write_parquet(rpath, records).await,
71 ParquetDataFrame::GenAIEval(df) => df.write_parquet(rpath, records).await,
72 }
73 }
74
75 pub fn storage_root(&self) -> String {
76 match self {
77 ParquetDataFrame::CustomMetric(df) => df.storage_root(),
78 ParquetDataFrame::Psi(df) => df.storage_root(),
79 ParquetDataFrame::Spc(df) => df.storage_root(),
80 ParquetDataFrame::GenAITask(df) => df.storage_root(),
81 ParquetDataFrame::GenAIWorkflow(df) => df.storage_root(),
82 ParquetDataFrame::GenAIEval(df) => df.storage_root(),
83 }
84 }
85
86 pub fn storage_client(&self) -> ObjectStore {
88 match self {
89 ParquetDataFrame::CustomMetric(df) => df.object_store.clone(),
90 ParquetDataFrame::Psi(df) => df.object_store.clone(),
91 ParquetDataFrame::Spc(df) => df.object_store.clone(),
92 ParquetDataFrame::GenAITask(df) => df.object_store.clone(),
93 ParquetDataFrame::GenAIWorkflow(df) => df.object_store.clone(),
94 ParquetDataFrame::GenAIEval(df) => df.object_store.clone(),
95 }
96 }
97
98 #[allow(clippy::too_many_arguments)]
109 pub async fn get_binned_metrics(
110 &self,
111 path: &str,
112 bin: &f64,
113 start_time: &DateTime<Utc>,
114 end_time: &DateTime<Utc>,
115 entity_id: &i32,
116 ) -> Result<DataFrame, DataFrameError> {
117 let read_path = &self.resolve_path(path);
118
119 match self {
120 ParquetDataFrame::CustomMetric(df) => {
121 df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
122 .await
123 }
124 ParquetDataFrame::Psi(df) => {
125 df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
126 .await
127 }
128 ParquetDataFrame::Spc(df) => {
129 df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
130 .await
131 }
132
133 ParquetDataFrame::GenAITask(df) => {
134 df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
135 .await
136 }
137 ParquetDataFrame::GenAIWorkflow(df) => {
138 df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
139 .await
140 }
141 ParquetDataFrame::GenAIEval(_) => Err(DataFrameError::UnsupportedOperation(
142 "GenAI drift does not support binned metrics".to_string(),
143 )),
144 }
145 }
146
147 pub fn storage_type(&self) -> StorageType {
149 match self {
150 ParquetDataFrame::CustomMetric(df) => {
151 df.object_store.storage_settings.storage_type.clone()
152 }
153 ParquetDataFrame::Psi(df) => df.object_store.storage_settings.storage_type.clone(),
154 ParquetDataFrame::Spc(df) => df.object_store.storage_settings.storage_type.clone(),
155 ParquetDataFrame::GenAITask(df) => {
156 df.object_store.storage_settings.storage_type.clone()
157 }
158 ParquetDataFrame::GenAIWorkflow(df) => {
159 df.object_store.storage_settings.storage_type.clone()
160 }
161 ParquetDataFrame::GenAIEval(df) => {
162 df.object_store.storage_settings.storage_type.clone()
163 }
164 }
165 }
166
167 pub fn resolve_path(&self, path: &str) -> String {
168 format!("{}/{}/", self.storage_root(), path)
169 }
170}
171
172#[cfg(test)]
173mod tests {
174
175 use super::*;
176 use crate::parquet::psi::dataframe_to_psi_drift_features;
177 use crate::parquet::spc::dataframe_to_spc_drift_features;
178 use crate::parquet::types::BinnedTableName;
179 use crate::parquet::utils::BinnedMetricsExtractor;
180 use chrono::Utc;
181 use object_store::path::Path;
182 use rand::Rng;
183 use scouter_settings::ObjectStorageSettings;
184 use scouter_types::Assertion;
185 use scouter_types::{
186 BoxedEvalRecord, EvalRecord, PsiRecord, ServerRecord, ServerRecords, SpcRecord, Status,
187 };
188 use scouter_types::{CustomMetricRecord, EvalTaskResult, GenAIEvalWorkflowResult};
189 use serde_json::Map;
190 use serde_json::Value;
191
192 fn cleanup() {
193 let storage_settings = ObjectStorageSettings::default();
194 let current_dir = std::env::current_dir().unwrap();
195 let storage_path = current_dir.join(storage_settings.storage_root());
196 if storage_path.exists() {
197 std::fs::remove_dir_all(storage_path).unwrap();
198 }
199 }
200
201 #[tokio::test]
202 async fn test_write_genai_event_record_dataframe_local() {
203 cleanup();
204 let storage_settings = ObjectStorageSettings::default();
205 let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIEval).unwrap();
206 let mut batch = Vec::new();
207 let entity_id = rand::rng().random_range(0..100);
208
209 for i in 0..3 {
211 for j in 0..50 {
212 let record = EvalRecord {
213 created_at: Utc::now() + chrono::Duration::hours(i),
214 entity_id,
215 context: serde_json::Value::Object(Map::new()),
216 status: Status::Pending,
217 id: 0,
218 uid: format!("record_uid_{i}_{j}"),
219 entity_uid: format!("entity_uid_{entity_id}"),
220 ..Default::default()
221 };
222
223 let boxed_record = BoxedEvalRecord::new(record);
224 batch.push(ServerRecord::GenAIEval(boxed_record));
225 }
226 }
227
228 let records = ServerRecords::new(batch);
229 let rpath = BinnedTableName::GenAIEval.to_string();
230 df.write_parquet(&rpath, records.clone()).await.unwrap();
231
232 let canonical_path = df.storage_root();
234 let data_path = object_store::path::Path::from(canonical_path);
235
236 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
238 assert_eq!(files.len(), 3);
239
240 for file in files.iter() {
242 let path = Path::from(file.to_string());
243 df.storage_client()
244 .delete(&path)
245 .await
246 .expect("Failed to delete file");
247 }
248 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
251 assert_eq!(files.len(), 0);
252
253 cleanup();
255 }
256
257 #[tokio::test]
258 async fn test_write_genai_task_dataframe_local() {
259 cleanup();
260 let storage_settings = ObjectStorageSettings::default();
261 let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAITask).unwrap();
262 let mut batch = Vec::new();
263 let start_utc = Utc::now();
264 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
265 let entity_id = rand::rng().random_range(0..100);
266
267 for i in 0..3 {
269 for j in 0..50 {
270 let record = ServerRecord::GenAITaskRecord(EvalTaskResult {
271 record_uid: format!("record_uid_{i}_{j}"),
272 created_at: Utc::now() + chrono::Duration::hours(i),
273 start_time: Utc::now() + chrono::Duration::hours(i),
274 end_time: Utc::now()
275 + chrono::Duration::hours(i)
276 + chrono::Duration::minutes(5),
277 entity_id,
278 task_id: format!("task{i}"),
279 task_type: scouter_types::genai::EvaluationTaskType::Assertion,
280 passed: true,
281 value: j as f64,
282 assertion: Assertion::FieldPath(Some(format!("field.path.{i}"))),
283 operator: scouter_types::genai::ComparisonOperator::Contains,
284 expected: Value::Null,
285 actual: Value::Null,
286 message: "All good".to_string(),
287 entity_uid: format!("entity_uid_{entity_id}"),
288 condition: false,
289 stage: 0,
290 });
291
292 batch.push(record);
293 }
294 }
295
296 let records = ServerRecords::new(batch);
297 let rpath = BinnedTableName::GenAITask.to_string();
298 df.write_parquet(&rpath, records.clone()).await.unwrap();
299
300 let canonical_path = df.storage_root();
302 let data_path = object_store::path::Path::from(canonical_path);
303
304 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
306 assert_eq!(files.len(), 3);
307
308 let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAITask).unwrap();
310
311 let read_df = new_df
312 .get_binned_metrics(&rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
313 .await
314 .unwrap();
315
316 let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
319 .await
320 .unwrap();
321
322 assert_eq!(binned_metrics.metrics.len(), 3);
323
324 for file in files.iter() {
326 let path = Path::from(file.to_string());
327 df.storage_client()
328 .delete(&path)
329 .await
330 .expect("Failed to delete file");
331 }
332 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
335 assert_eq!(files.len(), 0);
336
337 cleanup();
339 }
340
341 #[tokio::test]
342 async fn test_write_genai_workflow_dataframe_local() {
343 cleanup();
344 let storage_settings = ObjectStorageSettings::default();
345 let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIWorkflow).unwrap();
346 let mut batch = Vec::new();
347 let start_utc = Utc::now();
348 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
349 let entity_id = rand::rng().random_range(0..100);
350
351 for i in 0..3 {
353 for j in 0..50 {
354 let record = ServerRecord::GenAIWorkflowRecord(GenAIEvalWorkflowResult {
355 record_uid: format!("record_uid_{i}_{j}"),
356 created_at: Utc::now() + chrono::Duration::hours(i),
357 entity_id,
358 total_tasks: 10,
359 passed_tasks: 8,
360 failed_tasks: 2,
361 pass_rate: 0.8,
362 duration_ms: 1500,
363 entity_uid: format!("entity_uid_{entity_id}"),
364 execution_plan: scouter_types::genai::ExecutionPlan::default(),
365 id: j,
366 });
367
368 batch.push(record);
369 }
370 }
371
372 let records = ServerRecords::new(batch);
373 let rpath = BinnedTableName::GenAIWorkflow.to_string();
374 df.write_parquet(&rpath, records.clone()).await.unwrap();
375
376 let canonical_path = df.storage_root();
378 let data_path = object_store::path::Path::from(canonical_path);
379
380 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
382 assert_eq!(files.len(), 3);
383
384 let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIWorkflow).unwrap();
386
387 let read_df = new_df
388 .get_binned_metrics(&rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
389 .await
390 .unwrap();
391
392 let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
393 .await
394 .unwrap();
395
396 assert_eq!(binned_metrics.metrics.len(), 1);
398
399 let workflow_metrics = &binned_metrics.metrics["workflow"];
400 assert_eq!(workflow_metrics.created_at.len(), 3);
401 assert_eq!(workflow_metrics.stats.len(), 3);
402
403 for file in files.iter() {
405 let path = Path::from(file.to_string());
406 df.storage_client()
407 .delete(&path)
408 .await
409 .expect("Failed to delete file");
410 }
411 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
414 assert_eq!(files.len(), 0);
415
416 cleanup();
418 }
419
420 #[tokio::test]
421 async fn test_write_custom_dataframe_local() {
422 cleanup();
423 let storage_settings = ObjectStorageSettings::default();
424 let df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
425 let mut batch = Vec::new();
426 let start_utc = Utc::now();
427 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
428 let entity_id = rand::rng().random_range(0..100);
429 for i in 0..3 {
431 for j in 0..50 {
432 let record = ServerRecord::Custom(CustomMetricRecord {
433 created_at: Utc::now() + chrono::Duration::hours(i),
434 metric: format!("metric{i}"),
435 value: j as f64,
436 entity_id,
437 uid: format!("entity_uid_{entity_id}"),
438 });
439
440 batch.push(record);
441 }
442 }
443
444 let records = ServerRecords::new(batch);
445 let rpath = "custom";
446 df.write_parquet(rpath, records.clone()).await.unwrap();
447
448 let canonical_path = df.storage_root();
450 let data_path = object_store::path::Path::from(canonical_path);
451
452 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
454 assert_eq!(files.len(), 3);
455
456 let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
458
459 let read_df = new_df
460 .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
461 .await
462 .unwrap();
463
464 let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
467 .await
468 .unwrap();
469
470 assert_eq!(binned_metrics.metrics.len(), 3);
471
472 for file in files.iter() {
474 let path = Path::from(file.to_string());
475 df.storage_client()
476 .delete(&path)
477 .await
478 .expect("Failed to delete file");
479 }
480 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
483 assert_eq!(files.len(), 0);
484
485 cleanup();
487 }
488
489 #[tokio::test]
490 async fn test_write_psi_dataframe_local() {
491 cleanup();
492
493 let storage_settings = ObjectStorageSettings::default();
494 let df = ParquetDataFrame::new(&storage_settings, &RecordType::Psi).unwrap();
495 let mut batch = Vec::new();
496 let start_utc = Utc::now();
497 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
498 let entity_id = rand::rng().random_range(0..100);
499 for i in 0..3 {
500 for j in 0..5 {
501 let record = ServerRecord::Psi(PsiRecord {
502 created_at: Utc::now() + chrono::Duration::hours(i),
503 feature: "feature1".to_string(),
504 bin_id: j,
505 bin_count: rand::rng().random_range(0..100),
506 entity_id,
507 uid: format!("entity_uid_{entity_id}"),
508 });
509
510 batch.push(record);
511 }
512 }
513
514 for i in 0..3 {
515 for j in 0..5 {
516 let record = ServerRecord::Psi(PsiRecord {
517 created_at: Utc::now() + chrono::Duration::hours(i),
518 feature: "feature2".to_string(),
519 bin_id: j,
520 bin_count: rand::rng().random_range(0..100),
521 entity_id,
522 uid: format!("entity_uid_{entity_id}"),
523 });
524
525 batch.push(record);
526 }
527 }
528
529 let records = ServerRecords::new(batch);
530 let rpath = "psi";
531 df.write_parquet(rpath, records.clone()).await.unwrap();
532
533 let canonical_path = df.storage_root();
535 let data_path = object_store::path::Path::from(canonical_path);
536
537 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
539 assert_eq!(files.len(), 3);
540
541 let read_df = df
543 .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
544 .await
545 .unwrap();
546
547 let psi_drift = dataframe_to_psi_drift_features(read_df).await.unwrap();
548 assert_eq!(psi_drift.len(), 2);
549
550 for file in files.iter() {
552 let path = Path::from(file.to_string());
553 df.storage_client()
554 .delete(&path)
555 .await
556 .expect("Failed to delete file");
557 }
558 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
561 assert_eq!(files.len(), 0);
562
563 cleanup();
565 }
566
567 #[tokio::test]
568 async fn test_write_spc_dataframe_local() {
569 cleanup();
570 let storage_settings = ObjectStorageSettings::default();
571 let df = ParquetDataFrame::new(&storage_settings, &RecordType::Spc).unwrap();
572 let mut batch = Vec::new();
573 let start_utc = Utc::now();
574 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
575 let entity_id = rand::rng().random_range(0..100);
576 for i in 0..5 {
577 let record = ServerRecord::Spc(SpcRecord {
578 created_at: Utc::now() + chrono::Duration::hours(i),
579 feature: "feature1".to_string(),
580 value: i as f64,
581 entity_id,
582 uid: format!("entity_uid_{entity_id}"),
583 });
584
585 batch.push(record);
586 }
587
588 for i in 0..5 {
589 let record = ServerRecord::Spc(SpcRecord {
590 created_at: Utc::now() + chrono::Duration::hours(i),
591 feature: "feature2".to_string(),
592 value: i as f64,
593 entity_id,
594 uid: format!("entity_uid_{entity_id}"),
595 });
596
597 batch.push(record);
598 }
599
600 let records = ServerRecords::new(batch);
601 let rpath = "spc";
602 df.write_parquet(rpath, records.clone()).await.unwrap();
603
604 let canonical_path = df.storage_root();
606 let data_path = object_store::path::Path::from(canonical_path);
607
608 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
610 assert_eq!(files.len(), 5);
611
612 let read_df = df
614 .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
615 .await
616 .unwrap();
617
618 let _spc_drift = dataframe_to_spc_drift_features(read_df).await.unwrap();
619
620 for file in files.iter() {
622 let path = Path::from(file.to_string());
623 df.storage_client()
624 .delete(&path)
625 .await
626 .expect("Failed to delete file");
627 }
628 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
631 assert_eq!(files.len(), 0);
632
633 cleanup();
635 }
636}