1use crate::error::DataFrameError;
2use crate::parquet::custom::CustomMetricDataFrame;
3use crate::parquet::genai::{GenAIEvalDataFrame, GenAITaskDataFrame, GenAIWorkflowDataFrame};
4use crate::parquet::psi::PsiDataFrame;
5use crate::parquet::spc::SpcDataFrame;
6use crate::parquet::traits::ParquetFrame;
7use crate::storage::ObjectStore;
8use chrono::{DateTime, Utc};
9use datafusion::prelude::DataFrame;
10use scouter_settings::ObjectStorageSettings;
11use scouter_types::{RecordType, ServerRecords, StorageType};
12use tracing::instrument;
13
14pub enum ParquetDataFrame {
15 CustomMetric(CustomMetricDataFrame),
16 Psi(PsiDataFrame),
17 Spc(SpcDataFrame),
18 GenAITask(GenAITaskDataFrame),
19 GenAIWorkflow(GenAIWorkflowDataFrame),
20 GenAIEval(GenAIEvalDataFrame),
21}
22
23impl ParquetDataFrame {
24 pub fn new(
25 storage_settings: &ObjectStorageSettings,
26 record_type: &RecordType,
27 ) -> Result<Self, DataFrameError> {
28 match record_type {
29 RecordType::Custom => Ok(ParquetDataFrame::CustomMetric(CustomMetricDataFrame::new(
30 storage_settings,
31 )?)),
32 RecordType::Psi => Ok(ParquetDataFrame::Psi(PsiDataFrame::new(storage_settings)?)),
33 RecordType::Spc => Ok(ParquetDataFrame::Spc(SpcDataFrame::new(storage_settings)?)),
34 RecordType::GenAITask => Ok(ParquetDataFrame::GenAITask(GenAITaskDataFrame::new(
35 storage_settings,
36 )?)),
37 RecordType::GenAIWorkflow => Ok(ParquetDataFrame::GenAIWorkflow(
38 GenAIWorkflowDataFrame::new(storage_settings)?,
39 )),
40 RecordType::GenAIEval => Ok(ParquetDataFrame::GenAIEval(GenAIEvalDataFrame::new(
41 storage_settings,
42 )?)),
43
44 _ => Err(DataFrameError::InvalidRecordTypeError(
45 record_type.to_string(),
46 )),
47 }
48 }
49
50 #[instrument(skip_all, err)]
58 pub async fn write_parquet(
59 &self,
60 rpath: &str,
61 records: ServerRecords,
62 ) -> Result<(), DataFrameError> {
63 let rpath = &self.resolve_path(rpath);
64
65 match self {
66 ParquetDataFrame::CustomMetric(df) => df.write_parquet(rpath, records).await,
67 ParquetDataFrame::Psi(df) => df.write_parquet(rpath, records).await,
68 ParquetDataFrame::Spc(df) => df.write_parquet(rpath, records).await,
69 ParquetDataFrame::GenAITask(df) => df.write_parquet(rpath, records).await,
70 ParquetDataFrame::GenAIWorkflow(df) => df.write_parquet(rpath, records).await,
71 ParquetDataFrame::GenAIEval(df) => df.write_parquet(rpath, records).await,
72 }
73 }
74
75 pub fn storage_root(&self) -> String {
76 match self {
77 ParquetDataFrame::CustomMetric(df) => df.storage_root(),
78 ParquetDataFrame::Psi(df) => df.storage_root(),
79 ParquetDataFrame::Spc(df) => df.storage_root(),
80 ParquetDataFrame::GenAITask(df) => df.storage_root(),
81 ParquetDataFrame::GenAIWorkflow(df) => df.storage_root(),
82 ParquetDataFrame::GenAIEval(df) => df.storage_root(),
83 }
84 }
85
86 pub fn storage_client(&self) -> ObjectStore {
88 match self {
89 ParquetDataFrame::CustomMetric(df) => df.object_store.clone(),
90 ParquetDataFrame::Psi(df) => df.object_store.clone(),
91 ParquetDataFrame::Spc(df) => df.object_store.clone(),
92 ParquetDataFrame::GenAITask(df) => df.object_store.clone(),
93 ParquetDataFrame::GenAIWorkflow(df) => df.object_store.clone(),
94 ParquetDataFrame::GenAIEval(df) => df.object_store.clone(),
95 }
96 }
97
98 #[allow(clippy::too_many_arguments)]
109 pub async fn get_binned_metrics(
110 &self,
111 path: &str,
112 bin: &f64,
113 start_time: &DateTime<Utc>,
114 end_time: &DateTime<Utc>,
115 entity_id: &i32,
116 ) -> Result<DataFrame, DataFrameError> {
117 let read_path = &self.resolve_path(path);
118
119 match self {
120 ParquetDataFrame::CustomMetric(df) => {
121 df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
122 .await
123 }
124 ParquetDataFrame::Psi(df) => {
125 df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
126 .await
127 }
128 ParquetDataFrame::Spc(df) => {
129 df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
130 .await
131 }
132
133 ParquetDataFrame::GenAITask(df) => {
134 df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
135 .await
136 }
137 ParquetDataFrame::GenAIWorkflow(df) => {
138 df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
139 .await
140 }
141 ParquetDataFrame::GenAIEval(_) => Err(DataFrameError::UnsupportedOperation(
142 "GenAI drift does not support binned metrics".to_string(),
143 )),
144 }
145 }
146
147 pub fn storage_type(&self) -> StorageType {
149 match self {
150 ParquetDataFrame::CustomMetric(df) => {
151 df.object_store.storage_settings.storage_type.clone()
152 }
153 ParquetDataFrame::Psi(df) => df.object_store.storage_settings.storage_type.clone(),
154 ParquetDataFrame::Spc(df) => df.object_store.storage_settings.storage_type.clone(),
155 ParquetDataFrame::GenAITask(df) => {
156 df.object_store.storage_settings.storage_type.clone()
157 }
158 ParquetDataFrame::GenAIWorkflow(df) => {
159 df.object_store.storage_settings.storage_type.clone()
160 }
161 ParquetDataFrame::GenAIEval(df) => {
162 df.object_store.storage_settings.storage_type.clone()
163 }
164 }
165 }
166
167 pub fn resolve_path(&self, path: &str) -> String {
168 format!("{}/{}/", self.storage_root(), path)
169 }
170}
171
172#[cfg(test)]
173mod tests {
174
175 use super::*;
176 use crate::parquet::psi::dataframe_to_psi_drift_features;
177 use crate::parquet::spc::dataframe_to_spc_drift_features;
178 use crate::parquet::types::BinnedTableName;
179 use crate::parquet::utils::BinnedMetricsExtractor;
180 use chrono::Utc;
181 use object_store::path::Path;
182 use rand::Rng;
183 use scouter_settings::ObjectStorageSettings;
184 use scouter_types::Assertion;
185 use scouter_types::{
186 BoxedGenAIEvalRecord, GenAIEvalRecord, PsiRecord, ServerRecord, ServerRecords, SpcRecord,
187 Status,
188 };
189 use scouter_types::{CustomMetricRecord, GenAIEvalTaskResult, GenAIEvalWorkflowResult};
190 use serde_json::Map;
191 use serde_json::Value;
192
193 fn cleanup() {
194 let storage_settings = ObjectStorageSettings::default();
195 let current_dir = std::env::current_dir().unwrap();
196 let storage_path = current_dir.join(storage_settings.storage_root());
197 if storage_path.exists() {
198 std::fs::remove_dir_all(storage_path).unwrap();
199 }
200 }
201
202 #[tokio::test]
203 async fn test_write_genai_event_record_dataframe_local() {
204 cleanup();
205 let storage_settings = ObjectStorageSettings::default();
206 let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIEval).unwrap();
207 let mut batch = Vec::new();
208 let entity_id = rand::rng().random_range(0..100);
209
210 for i in 0..3 {
212 for j in 0..50 {
213 let record = GenAIEvalRecord {
214 created_at: Utc::now() + chrono::Duration::hours(i),
215 entity_id,
216 context: serde_json::Value::Object(Map::new()),
217 status: Status::Pending,
218 id: 0,
219 uid: format!("record_uid_{i}_{j}"),
220 entity_uid: format!("entity_uid_{entity_id}"),
221 ..Default::default()
222 };
223
224 let boxed_record = BoxedGenAIEvalRecord::new(record);
225 batch.push(ServerRecord::GenAIEval(boxed_record));
226 }
227 }
228
229 let records = ServerRecords::new(batch);
230 let rpath = BinnedTableName::GenAIEval.to_string();
231 df.write_parquet(&rpath, records.clone()).await.unwrap();
232
233 let canonical_path = df.storage_root();
235 let data_path = object_store::path::Path::from(canonical_path);
236
237 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
239 assert_eq!(files.len(), 3);
240
241 for file in files.iter() {
243 let path = Path::from(file.to_string());
244 df.storage_client()
245 .delete(&path)
246 .await
247 .expect("Failed to delete file");
248 }
249 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
252 assert_eq!(files.len(), 0);
253
254 cleanup();
256 }
257
258 #[tokio::test]
259 async fn test_write_genai_task_dataframe_local() {
260 cleanup();
261 let storage_settings = ObjectStorageSettings::default();
262 let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAITask).unwrap();
263 let mut batch = Vec::new();
264 let start_utc = Utc::now();
265 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
266 let entity_id = rand::rng().random_range(0..100);
267
268 for i in 0..3 {
270 for j in 0..50 {
271 let record = ServerRecord::GenAITaskRecord(GenAIEvalTaskResult {
272 record_uid: format!("record_uid_{i}_{j}"),
273 created_at: Utc::now() + chrono::Duration::hours(i),
274 start_time: Utc::now() + chrono::Duration::hours(i),
275 end_time: Utc::now()
276 + chrono::Duration::hours(i)
277 + chrono::Duration::minutes(5),
278 entity_id,
279 task_id: format!("task{i}"),
280 task_type: scouter_types::genai::EvaluationTaskType::Assertion,
281 passed: true,
282 value: j as f64,
283 assertion: Assertion::FieldPath(Some(format!("field.path.{i}"))),
284 operator: scouter_types::genai::ComparisonOperator::Contains,
285 expected: Value::Null,
286 actual: Value::Null,
287 message: "All good".to_string(),
288 entity_uid: format!("entity_uid_{entity_id}"),
289 condition: false,
290 stage: 0,
291 });
292
293 batch.push(record);
294 }
295 }
296
297 let records = ServerRecords::new(batch);
298 let rpath = BinnedTableName::GenAITask.to_string();
299 df.write_parquet(&rpath, records.clone()).await.unwrap();
300
301 let canonical_path = df.storage_root();
303 let data_path = object_store::path::Path::from(canonical_path);
304
305 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
307 assert_eq!(files.len(), 3);
308
309 let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAITask).unwrap();
311
312 let read_df = new_df
313 .get_binned_metrics(&rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
314 .await
315 .unwrap();
316
317 let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
320 .await
321 .unwrap();
322
323 assert_eq!(binned_metrics.metrics.len(), 3);
324
325 for file in files.iter() {
327 let path = Path::from(file.to_string());
328 df.storage_client()
329 .delete(&path)
330 .await
331 .expect("Failed to delete file");
332 }
333 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
336 assert_eq!(files.len(), 0);
337
338 cleanup();
340 }
341
342 #[tokio::test]
343 async fn test_write_genai_workflow_dataframe_local() {
344 cleanup();
345 let storage_settings = ObjectStorageSettings::default();
346 let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIWorkflow).unwrap();
347 let mut batch = Vec::new();
348 let start_utc = Utc::now();
349 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
350 let entity_id = rand::rng().random_range(0..100);
351
352 for i in 0..3 {
354 for j in 0..50 {
355 let record = ServerRecord::GenAIWorkflowRecord(GenAIEvalWorkflowResult {
356 record_uid: format!("record_uid_{i}_{j}"),
357 created_at: Utc::now() + chrono::Duration::hours(i),
358 entity_id,
359 total_tasks: 10,
360 passed_tasks: 8,
361 failed_tasks: 2,
362 pass_rate: 0.8,
363 duration_ms: 1500,
364 entity_uid: format!("entity_uid_{entity_id}"),
365 execution_plan: scouter_types::genai::ExecutionPlan::default(),
366 id: j,
367 });
368
369 batch.push(record);
370 }
371 }
372
373 let records = ServerRecords::new(batch);
374 let rpath = BinnedTableName::GenAIWorkflow.to_string();
375 df.write_parquet(&rpath, records.clone()).await.unwrap();
376
377 let canonical_path = df.storage_root();
379 let data_path = object_store::path::Path::from(canonical_path);
380
381 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
383 assert_eq!(files.len(), 3);
384
385 let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIWorkflow).unwrap();
387
388 let read_df = new_df
389 .get_binned_metrics(&rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
390 .await
391 .unwrap();
392
393 let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
396 .await
397 .unwrap();
398
399 assert_eq!(binned_metrics.metrics.len(), 3);
400
401 for file in files.iter() {
403 let path = Path::from(file.to_string());
404 df.storage_client()
405 .delete(&path)
406 .await
407 .expect("Failed to delete file");
408 }
409 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
412 assert_eq!(files.len(), 0);
413
414 cleanup();
416 }
417
418 #[tokio::test]
419 async fn test_write_custom_dataframe_local() {
420 cleanup();
421 let storage_settings = ObjectStorageSettings::default();
422 let df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
423 let mut batch = Vec::new();
424 let start_utc = Utc::now();
425 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
426 let entity_id = rand::rng().random_range(0..100);
427 for i in 0..3 {
429 for j in 0..50 {
430 let record = ServerRecord::Custom(CustomMetricRecord {
431 created_at: Utc::now() + chrono::Duration::hours(i),
432 metric: format!("metric{i}"),
433 value: j as f64,
434 entity_id,
435 uid: format!("entity_uid_{entity_id}"),
436 });
437
438 batch.push(record);
439 }
440 }
441
442 let records = ServerRecords::new(batch);
443 let rpath = "custom";
444 df.write_parquet(rpath, records.clone()).await.unwrap();
445
446 let canonical_path = df.storage_root();
448 let data_path = object_store::path::Path::from(canonical_path);
449
450 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
452 assert_eq!(files.len(), 3);
453
454 let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
456
457 let read_df = new_df
458 .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
459 .await
460 .unwrap();
461
462 let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
465 .await
466 .unwrap();
467
468 assert_eq!(binned_metrics.metrics.len(), 3);
469
470 for file in files.iter() {
472 let path = Path::from(file.to_string());
473 df.storage_client()
474 .delete(&path)
475 .await
476 .expect("Failed to delete file");
477 }
478 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
481 assert_eq!(files.len(), 0);
482
483 cleanup();
485 }
486
487 #[tokio::test]
488 async fn test_write_psi_dataframe_local() {
489 cleanup();
490
491 let storage_settings = ObjectStorageSettings::default();
492 let df = ParquetDataFrame::new(&storage_settings, &RecordType::Psi).unwrap();
493 let mut batch = Vec::new();
494 let start_utc = Utc::now();
495 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
496 let entity_id = rand::rng().random_range(0..100);
497 for i in 0..3 {
498 for j in 0..5 {
499 let record = ServerRecord::Psi(PsiRecord {
500 created_at: Utc::now() + chrono::Duration::hours(i),
501 feature: "feature1".to_string(),
502 bin_id: j,
503 bin_count: rand::rng().random_range(0..100),
504 entity_id,
505 uid: format!("entity_uid_{entity_id}"),
506 });
507
508 batch.push(record);
509 }
510 }
511
512 for i in 0..3 {
513 for j in 0..5 {
514 let record = ServerRecord::Psi(PsiRecord {
515 created_at: Utc::now() + chrono::Duration::hours(i),
516 feature: "feature2".to_string(),
517 bin_id: j,
518 bin_count: rand::rng().random_range(0..100),
519 entity_id,
520 uid: format!("entity_uid_{entity_id}"),
521 });
522
523 batch.push(record);
524 }
525 }
526
527 let records = ServerRecords::new(batch);
528 let rpath = "psi";
529 df.write_parquet(rpath, records.clone()).await.unwrap();
530
531 let canonical_path = df.storage_root();
533 let data_path = object_store::path::Path::from(canonical_path);
534
535 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
537 assert_eq!(files.len(), 3);
538
539 let read_df = df
541 .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
542 .await
543 .unwrap();
544
545 let psi_drift = dataframe_to_psi_drift_features(read_df).await.unwrap();
546 assert_eq!(psi_drift.len(), 2);
547
548 for file in files.iter() {
550 let path = Path::from(file.to_string());
551 df.storage_client()
552 .delete(&path)
553 .await
554 .expect("Failed to delete file");
555 }
556 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
559 assert_eq!(files.len(), 0);
560
561 cleanup();
563 }
564
565 #[tokio::test]
566 async fn test_write_spc_dataframe_local() {
567 cleanup();
568 let storage_settings = ObjectStorageSettings::default();
569 let df = ParquetDataFrame::new(&storage_settings, &RecordType::Spc).unwrap();
570 let mut batch = Vec::new();
571 let start_utc = Utc::now();
572 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
573 let entity_id = rand::rng().random_range(0..100);
574 for i in 0..5 {
575 let record = ServerRecord::Spc(SpcRecord {
576 created_at: Utc::now() + chrono::Duration::hours(i),
577 feature: "feature1".to_string(),
578 value: i as f64,
579 entity_id,
580 uid: format!("entity_uid_{entity_id}"),
581 });
582
583 batch.push(record);
584 }
585
586 for i in 0..5 {
587 let record = ServerRecord::Spc(SpcRecord {
588 created_at: Utc::now() + chrono::Duration::hours(i),
589 feature: "feature2".to_string(),
590 value: i as f64,
591 entity_id,
592 uid: format!("entity_uid_{entity_id}"),
593 });
594
595 batch.push(record);
596 }
597
598 let records = ServerRecords::new(batch);
599 let rpath = "spc";
600 df.write_parquet(rpath, records.clone()).await.unwrap();
601
602 let canonical_path = df.storage_root();
604 let data_path = object_store::path::Path::from(canonical_path);
605
606 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
608 assert_eq!(files.len(), 5);
609
610 let read_df = df
612 .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
613 .await
614 .unwrap();
615
616 let _spc_drift = dataframe_to_spc_drift_features(read_df).await.unwrap();
617
618 for file in files.iter() {
620 let path = Path::from(file.to_string());
621 df.storage_client()
622 .delete(&path)
623 .await
624 .expect("Failed to delete file");
625 }
626 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
629 assert_eq!(files.len(), 0);
630
631 cleanup();
633 }
634}