1use crate::error::DataFrameError;
2use crate::parquet::custom::CustomMetricDataFrame;
3use crate::parquet::genai::{GenAIEvalDataFrame, GenAITaskDataFrame, GenAIWorkflowDataFrame};
4use crate::parquet::psi::PsiDataFrame;
5use crate::parquet::spc::SpcDataFrame;
6use crate::parquet::traits::ParquetFrame;
7use crate::storage::ObjectStore;
8use chrono::{DateTime, Utc};
9use datafusion::prelude::DataFrame;
10use scouter_settings::ObjectStorageSettings;
11use scouter_types::{RecordType, ServerRecords, StorageType};
12use tracing::instrument;
13
14pub enum ParquetDataFrame {
15 CustomMetric(CustomMetricDataFrame),
16 Psi(PsiDataFrame),
17 Spc(SpcDataFrame),
18 GenAITask(GenAITaskDataFrame),
19 GenAIWorkflow(GenAIWorkflowDataFrame),
20 GenAIEval(GenAIEvalDataFrame),
21}
22
23impl ParquetDataFrame {
24 pub fn new(
25 storage_settings: &ObjectStorageSettings,
26 record_type: &RecordType,
27 ) -> Result<Self, DataFrameError> {
28 match record_type {
29 RecordType::Custom => Ok(ParquetDataFrame::CustomMetric(CustomMetricDataFrame::new(
30 storage_settings,
31 )?)),
32 RecordType::Psi => Ok(ParquetDataFrame::Psi(PsiDataFrame::new(storage_settings)?)),
33 RecordType::Spc => Ok(ParquetDataFrame::Spc(SpcDataFrame::new(storage_settings)?)),
34 RecordType::GenAITask => Ok(ParquetDataFrame::GenAITask(GenAITaskDataFrame::new(
35 storage_settings,
36 )?)),
37 RecordType::GenAIWorkflow => Ok(ParquetDataFrame::GenAIWorkflow(
38 GenAIWorkflowDataFrame::new(storage_settings)?,
39 )),
40 RecordType::GenAIEval => Ok(ParquetDataFrame::GenAIEval(GenAIEvalDataFrame::new(
41 storage_settings,
42 )?)),
43
44 _ => Err(DataFrameError::InvalidRecordTypeError(
45 record_type.to_string(),
46 )),
47 }
48 }
49
50 #[instrument(skip_all, err)]
58 pub async fn write_parquet(
59 &self,
60 rpath: &str,
61 records: ServerRecords,
62 ) -> Result<(), DataFrameError> {
63 let rpath = &self.resolve_path(rpath);
64
65 match self {
66 ParquetDataFrame::CustomMetric(df) => df.write_parquet(rpath, records).await,
67 ParquetDataFrame::Psi(df) => df.write_parquet(rpath, records).await,
68 ParquetDataFrame::Spc(df) => df.write_parquet(rpath, records).await,
69 ParquetDataFrame::GenAITask(df) => df.write_parquet(rpath, records).await,
70 ParquetDataFrame::GenAIWorkflow(df) => df.write_parquet(rpath, records).await,
71 ParquetDataFrame::GenAIEval(df) => df.write_parquet(rpath, records).await,
72 }
73 }
74
75 pub fn storage_root(&self) -> String {
76 match self {
77 ParquetDataFrame::CustomMetric(df) => df.storage_root(),
78 ParquetDataFrame::Psi(df) => df.storage_root(),
79 ParquetDataFrame::Spc(df) => df.storage_root(),
80 ParquetDataFrame::GenAITask(df) => df.storage_root(),
81 ParquetDataFrame::GenAIWorkflow(df) => df.storage_root(),
82 ParquetDataFrame::GenAIEval(df) => df.storage_root(),
83 }
84 }
85
86 pub fn storage_client(&self) -> ObjectStore {
88 match self {
89 ParquetDataFrame::CustomMetric(df) => df.object_store.clone(),
90 ParquetDataFrame::Psi(df) => df.object_store.clone(),
91 ParquetDataFrame::Spc(df) => df.object_store.clone(),
92 ParquetDataFrame::GenAITask(df) => df.object_store.clone(),
93 ParquetDataFrame::GenAIWorkflow(df) => df.object_store.clone(),
94 ParquetDataFrame::GenAIEval(df) => df.object_store.clone(),
95 }
96 }
97
98 #[allow(clippy::too_many_arguments)]
109 pub async fn get_binned_metrics(
110 &self,
111 path: &str,
112 bin: &f64,
113 start_time: &DateTime<Utc>,
114 end_time: &DateTime<Utc>,
115 entity_id: &i32,
116 ) -> Result<DataFrame, DataFrameError> {
117 let read_path = &self.resolve_path(path);
118
119 match self {
120 ParquetDataFrame::CustomMetric(df) => {
121 df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
122 .await
123 }
124 ParquetDataFrame::Psi(df) => {
125 df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
126 .await
127 }
128 ParquetDataFrame::Spc(df) => {
129 df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
130 .await
131 }
132
133 ParquetDataFrame::GenAITask(df) => {
134 df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
135 .await
136 }
137 ParquetDataFrame::GenAIWorkflow(df) => {
138 df.get_binned_metrics(read_path, bin, start_time, end_time, entity_id)
139 .await
140 }
141 ParquetDataFrame::GenAIEval(_) => Err(DataFrameError::UnsupportedOperation(
142 "GenAI drift does not support binned metrics".to_string(),
143 )),
144 }
145 }
146
147 pub fn storage_type(&self) -> StorageType {
149 match self {
150 ParquetDataFrame::CustomMetric(df) => {
151 df.object_store.storage_settings.storage_type.clone()
152 }
153 ParquetDataFrame::Psi(df) => df.object_store.storage_settings.storage_type.clone(),
154 ParquetDataFrame::Spc(df) => df.object_store.storage_settings.storage_type.clone(),
155 ParquetDataFrame::GenAITask(df) => {
156 df.object_store.storage_settings.storage_type.clone()
157 }
158 ParquetDataFrame::GenAIWorkflow(df) => {
159 df.object_store.storage_settings.storage_type.clone()
160 }
161 ParquetDataFrame::GenAIEval(df) => {
162 df.object_store.storage_settings.storage_type.clone()
163 }
164 }
165 }
166
167 pub fn resolve_path(&self, path: &str) -> String {
168 format!("{}/{}/", self.storage_root(), path)
169 }
170}
171
172#[cfg(test)]
173mod tests {
174
175 use super::*;
176 use crate::parquet::psi::dataframe_to_psi_drift_features;
177 use crate::parquet::spc::dataframe_to_spc_drift_features;
178 use crate::parquet::types::BinnedTableName;
179 use crate::parquet::utils::BinnedMetricsExtractor;
180 use chrono::Utc;
181 use object_store::path::Path;
182 use rand::Rng;
183 use scouter_settings::ObjectStorageSettings;
184 use scouter_types::{
185 BoxedGenAIEvalRecord, GenAIEvalRecord, PsiRecord, ServerRecord, ServerRecords, SpcRecord,
186 Status,
187 };
188 use scouter_types::{CustomMetricRecord, GenAIEvalTaskResult, GenAIEvalWorkflowResult};
189 use serde_json::Map;
190 use serde_json::Value;
191
192 fn cleanup() {
193 let storage_settings = ObjectStorageSettings::default();
194 let current_dir = std::env::current_dir().unwrap();
195 let storage_path = current_dir.join(storage_settings.storage_root());
196 if storage_path.exists() {
197 std::fs::remove_dir_all(storage_path).unwrap();
198 }
199 }
200
201 #[tokio::test]
202 async fn test_write_genai_event_record_dataframe_local() {
203 cleanup();
204 let storage_settings = ObjectStorageSettings::default();
205 let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIEval).unwrap();
206 let mut batch = Vec::new();
207 let entity_id = rand::rng().random_range(0..100);
208
209 for i in 0..3 {
211 for j in 0..50 {
212 let record = GenAIEvalRecord {
213 created_at: Utc::now() + chrono::Duration::hours(i),
214 entity_id,
215 context: serde_json::Value::Object(Map::new()),
216 status: Status::Pending,
217 id: 0,
218 uid: format!("record_uid_{i}_{j}"),
219 entity_uid: format!("entity_uid_{entity_id}"),
220 ..Default::default()
221 };
222
223 let boxed_record = BoxedGenAIEvalRecord::new(record);
224 batch.push(ServerRecord::GenAIEval(boxed_record));
225 }
226 }
227
228 let records = ServerRecords::new(batch);
229 let rpath = BinnedTableName::GenAIEval.to_string();
230 df.write_parquet(&rpath, records.clone()).await.unwrap();
231
232 let canonical_path = df.storage_root();
234 let data_path = object_store::path::Path::from(canonical_path);
235
236 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
238 assert_eq!(files.len(), 3);
239
240 for file in files.iter() {
242 let path = Path::from(file.to_string());
243 df.storage_client()
244 .delete(&path)
245 .await
246 .expect("Failed to delete file");
247 }
248 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
251 assert_eq!(files.len(), 0);
252
253 cleanup();
255 }
256
257 #[tokio::test]
258 async fn test_write_genai_task_dataframe_local() {
259 cleanup();
260 let storage_settings = ObjectStorageSettings::default();
261 let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAITask).unwrap();
262 let mut batch = Vec::new();
263 let start_utc = Utc::now();
264 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
265 let entity_id = rand::rng().random_range(0..100);
266
267 for i in 0..3 {
269 for j in 0..50 {
270 let record = ServerRecord::GenAITaskRecord(GenAIEvalTaskResult {
271 record_uid: format!("record_uid_{i}_{j}"),
272 created_at: Utc::now() + chrono::Duration::hours(i),
273 start_time: Utc::now() + chrono::Duration::hours(i),
274 end_time: Utc::now()
275 + chrono::Duration::hours(i)
276 + chrono::Duration::minutes(5),
277 entity_id,
278 task_id: format!("task{i}"),
279 task_type: scouter_types::genai::EvaluationTaskType::Assertion,
280 passed: true,
281 value: j as f64,
282 field_path: Some(format!("field.path.{i}")),
283 operator: scouter_types::genai::ComparisonOperator::Contains,
284 expected: Value::Null,
285 actual: Value::Null,
286 message: "All good".to_string(),
287 entity_uid: format!("entity_uid_{entity_id}"),
288 condition: false,
289 stage: 0,
290 });
291
292 batch.push(record);
293 }
294 }
295
296 let records = ServerRecords::new(batch);
297 let rpath = BinnedTableName::GenAITask.to_string();
298 df.write_parquet(&rpath, records.clone()).await.unwrap();
299
300 let canonical_path = df.storage_root();
302 let data_path = object_store::path::Path::from(canonical_path);
303
304 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
306 assert_eq!(files.len(), 3);
307
308 let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAITask).unwrap();
310
311 let read_df = new_df
312 .get_binned_metrics(&rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
313 .await
314 .unwrap();
315
316 let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
319 .await
320 .unwrap();
321
322 assert_eq!(binned_metrics.metrics.len(), 3);
323
324 for file in files.iter() {
326 let path = Path::from(file.to_string());
327 df.storage_client()
328 .delete(&path)
329 .await
330 .expect("Failed to delete file");
331 }
332 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
335 assert_eq!(files.len(), 0);
336
337 cleanup();
339 }
340
341 #[tokio::test]
342 async fn test_write_genai_workflow_dataframe_local() {
343 cleanup();
344 let storage_settings = ObjectStorageSettings::default();
345 let df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIWorkflow).unwrap();
346 let mut batch = Vec::new();
347 let start_utc = Utc::now();
348 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
349 let entity_id = rand::rng().random_range(0..100);
350
351 for i in 0..3 {
353 for j in 0..50 {
354 let record = ServerRecord::GenAIWorkflowRecord(GenAIEvalWorkflowResult {
355 record_uid: format!("record_uid_{i}_{j}"),
356 created_at: Utc::now() + chrono::Duration::hours(i),
357 entity_id,
358 total_tasks: 10,
359 passed_tasks: 8,
360 failed_tasks: 2,
361 pass_rate: 0.8,
362 duration_ms: 1500,
363 entity_uid: format!("entity_uid_{entity_id}"),
364 execution_plan: scouter_types::genai::ExecutionPlan::default(),
365 id: j,
366 });
367
368 batch.push(record);
369 }
370 }
371
372 let records = ServerRecords::new(batch);
373 let rpath = BinnedTableName::GenAIWorkflow.to_string();
374 df.write_parquet(&rpath, records.clone()).await.unwrap();
375
376 let canonical_path = df.storage_root();
378 let data_path = object_store::path::Path::from(canonical_path);
379
380 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
382 assert_eq!(files.len(), 3);
383
384 let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::GenAIWorkflow).unwrap();
386
387 let read_df = new_df
388 .get_binned_metrics(&rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
389 .await
390 .unwrap();
391
392 let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
395 .await
396 .unwrap();
397
398 assert_eq!(binned_metrics.metrics.len(), 3);
399
400 for file in files.iter() {
402 let path = Path::from(file.to_string());
403 df.storage_client()
404 .delete(&path)
405 .await
406 .expect("Failed to delete file");
407 }
408 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
411 assert_eq!(files.len(), 0);
412
413 cleanup();
415 }
416
417 #[tokio::test]
418 async fn test_write_custom_dataframe_local() {
419 cleanup();
420 let storage_settings = ObjectStorageSettings::default();
421 let df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
422 let mut batch = Vec::new();
423 let start_utc = Utc::now();
424 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
425 let entity_id = rand::rng().random_range(0..100);
426 for i in 0..3 {
428 for j in 0..50 {
429 let record = ServerRecord::Custom(CustomMetricRecord {
430 created_at: Utc::now() + chrono::Duration::hours(i),
431 metric: format!("metric{i}"),
432 value: j as f64,
433 entity_id,
434 uid: format!("entity_uid_{entity_id}"),
435 });
436
437 batch.push(record);
438 }
439 }
440
441 let records = ServerRecords::new(batch);
442 let rpath = "custom";
443 df.write_parquet(rpath, records.clone()).await.unwrap();
444
445 let canonical_path = df.storage_root();
447 let data_path = object_store::path::Path::from(canonical_path);
448
449 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
451 assert_eq!(files.len(), 3);
452
453 let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
455
456 let read_df = new_df
457 .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
458 .await
459 .unwrap();
460
461 let binned_metrics = BinnedMetricsExtractor::dataframe_to_binned_metrics(read_df)
464 .await
465 .unwrap();
466
467 assert_eq!(binned_metrics.metrics.len(), 3);
468
469 for file in files.iter() {
471 let path = Path::from(file.to_string());
472 df.storage_client()
473 .delete(&path)
474 .await
475 .expect("Failed to delete file");
476 }
477 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
480 assert_eq!(files.len(), 0);
481
482 cleanup();
484 }
485
486 #[tokio::test]
487 async fn test_write_psi_dataframe_local() {
488 cleanup();
489
490 let storage_settings = ObjectStorageSettings::default();
491 let df = ParquetDataFrame::new(&storage_settings, &RecordType::Psi).unwrap();
492 let mut batch = Vec::new();
493 let start_utc = Utc::now();
494 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
495 let entity_id = rand::rng().random_range(0..100);
496 for i in 0..3 {
497 for j in 0..5 {
498 let record = ServerRecord::Psi(PsiRecord {
499 created_at: Utc::now() + chrono::Duration::hours(i),
500 feature: "feature1".to_string(),
501 bin_id: j,
502 bin_count: rand::rng().random_range(0..100),
503 entity_id,
504 uid: format!("entity_uid_{entity_id}"),
505 });
506
507 batch.push(record);
508 }
509 }
510
511 for i in 0..3 {
512 for j in 0..5 {
513 let record = ServerRecord::Psi(PsiRecord {
514 created_at: Utc::now() + chrono::Duration::hours(i),
515 feature: "feature2".to_string(),
516 bin_id: j,
517 bin_count: rand::rng().random_range(0..100),
518 entity_id,
519 uid: format!("entity_uid_{entity_id}"),
520 });
521
522 batch.push(record);
523 }
524 }
525
526 let records = ServerRecords::new(batch);
527 let rpath = "psi";
528 df.write_parquet(rpath, records.clone()).await.unwrap();
529
530 let canonical_path = df.storage_root();
532 let data_path = object_store::path::Path::from(canonical_path);
533
534 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
536 assert_eq!(files.len(), 3);
537
538 let read_df = df
540 .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
541 .await
542 .unwrap();
543
544 let psi_drift = dataframe_to_psi_drift_features(read_df).await.unwrap();
545 assert_eq!(psi_drift.len(), 2);
546
547 for file in files.iter() {
549 let path = Path::from(file.to_string());
550 df.storage_client()
551 .delete(&path)
552 .await
553 .expect("Failed to delete file");
554 }
555 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
558 assert_eq!(files.len(), 0);
559
560 cleanup();
562 }
563
564 #[tokio::test]
565 async fn test_write_spc_dataframe_local() {
566 cleanup();
567 let storage_settings = ObjectStorageSettings::default();
568 let df = ParquetDataFrame::new(&storage_settings, &RecordType::Spc).unwrap();
569 let mut batch = Vec::new();
570 let start_utc = Utc::now();
571 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
572 let entity_id = rand::rng().random_range(0..100);
573 for i in 0..5 {
574 let record = ServerRecord::Spc(SpcRecord {
575 created_at: Utc::now() + chrono::Duration::hours(i),
576 feature: "feature1".to_string(),
577 value: i as f64,
578 entity_id,
579 uid: format!("entity_uid_{entity_id}"),
580 });
581
582 batch.push(record);
583 }
584
585 for i in 0..5 {
586 let record = ServerRecord::Spc(SpcRecord {
587 created_at: Utc::now() + chrono::Duration::hours(i),
588 feature: "feature2".to_string(),
589 value: i as f64,
590 entity_id,
591 uid: format!("entity_uid_{entity_id}"),
592 });
593
594 batch.push(record);
595 }
596
597 let records = ServerRecords::new(batch);
598 let rpath = "spc";
599 df.write_parquet(rpath, records.clone()).await.unwrap();
600
601 let canonical_path = df.storage_root();
603 let data_path = object_store::path::Path::from(canonical_path);
604
605 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
607 assert_eq!(files.len(), 5);
608
609 let read_df = df
611 .get_binned_metrics(rpath, &0.01, &start_utc, &end_utc_for_test, &entity_id)
612 .await
613 .unwrap();
614
615 let _spc_drift = dataframe_to_spc_drift_features(read_df).await.unwrap();
616
617 for file in files.iter() {
619 let path = Path::from(file.to_string());
620 df.storage_client()
621 .delete(&path)
622 .await
623 .expect("Failed to delete file");
624 }
625 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
628 assert_eq!(files.len(), 0);
629
630 cleanup();
632 }
633}