use crate::error::DataFrameError;
use crate::parquet::custom::CustomMetricDataFrame;
use crate::parquet::psi::PsiDataFrame;
use crate::parquet::spc::SpcDataFrame;
use crate::parquet::traits::ParquetFrame;
use crate::storage::ObjectStore;
use chrono::{DateTime, Utc};
use datafusion::prelude::DataFrame;
use scouter_settings::ObjectStorageSettings;
use scouter_types::{RecordType, ServerRecords, StorageType};
use tracing::instrument;
pub enum ParquetDataFrame {
CustomMetric(CustomMetricDataFrame),
Psi(PsiDataFrame),
Spc(SpcDataFrame),
}
impl ParquetDataFrame {
pub fn new(
storage_settings: &ObjectStorageSettings,
record_type: &RecordType,
) -> Result<Self, DataFrameError> {
match record_type {
RecordType::Custom => Ok(ParquetDataFrame::CustomMetric(CustomMetricDataFrame::new(
storage_settings,
)?)),
RecordType::Psi => Ok(ParquetDataFrame::Psi(PsiDataFrame::new(storage_settings)?)),
RecordType::Spc => Ok(ParquetDataFrame::Spc(SpcDataFrame::new(storage_settings)?)),
_ => Err(DataFrameError::InvalidRecordTypeError),
}
}
#[instrument(skip_all, err)]
pub async fn write_parquet(
&self,
rpath: &str,
records: ServerRecords,
) -> Result<(), DataFrameError> {
let rpath = &self.resolve_path(rpath);
match self {
ParquetDataFrame::CustomMetric(df) => df.write_parquet(rpath, records).await,
ParquetDataFrame::Psi(df) => df.write_parquet(rpath, records).await,
ParquetDataFrame::Spc(df) => df.write_parquet(rpath, records).await,
}
}
pub fn storage_root(&self) -> String {
match self {
ParquetDataFrame::CustomMetric(df) => df.storage_root(),
ParquetDataFrame::Psi(df) => df.storage_root(),
ParquetDataFrame::Spc(df) => df.storage_root(),
}
}
pub fn storage_client(&self) -> ObjectStore {
match self {
ParquetDataFrame::CustomMetric(df) => df.object_store.clone(),
ParquetDataFrame::Psi(df) => df.object_store.clone(),
ParquetDataFrame::Spc(df) => df.object_store.clone(),
}
}
#[allow(clippy::too_many_arguments)]
pub async fn get_binned_metrics(
&self,
path: &str,
bin: &f64,
start_time: &DateTime<Utc>,
end_time: &DateTime<Utc>,
space: &str,
name: &str,
version: &str,
) -> Result<DataFrame, DataFrameError> {
let read_path = &self.resolve_path(path);
match self {
ParquetDataFrame::CustomMetric(df) => {
df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
.await
}
ParquetDataFrame::Psi(df) => {
df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
.await
}
ParquetDataFrame::Spc(df) => {
df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
.await
}
}
}
pub fn storage_type(&self) -> StorageType {
match self {
ParquetDataFrame::CustomMetric(df) => {
df.object_store.storage_settings.storage_type.clone()
}
ParquetDataFrame::Psi(df) => df.object_store.storage_settings.storage_type.clone(),
ParquetDataFrame::Spc(df) => df.object_store.storage_settings.storage_type.clone(),
}
}
pub fn resolve_path(&self, path: &str) -> String {
format!("{}/{}/", self.storage_root(), path)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::parquet::custom::dataframe_to_custom_drift_metrics;
use crate::parquet::psi::dataframe_to_psi_drift_features;
use crate::parquet::spc::dataframe_to_spc_drift_features;
use chrono::Utc;
use object_store::path::Path;
use rand::Rng;
use scouter_settings::ObjectStorageSettings;
use scouter_types::{
CustomMetricServerRecord, PsiServerRecord, ServerRecord, ServerRecords, SpcServerRecord,
};
fn cleanup() {
let storage_settings = ObjectStorageSettings::default();
let current_dir = std::env::current_dir().unwrap();
let storage_path = current_dir.join(storage_settings.storage_root());
if storage_path.exists() {
std::fs::remove_dir_all(storage_path).unwrap();
}
}
#[tokio::test]
async fn test_write_custom_dataframe_local() {
cleanup();
let storage_settings = ObjectStorageSettings::default();
let df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
let mut batch = Vec::new();
let start_utc = Utc::now();
let end_utc_for_test = start_utc + chrono::Duration::hours(3);
for i in 0..3 {
for j in 0..50 {
let record = ServerRecord::Custom(CustomMetricServerRecord {
created_at: Utc::now() + chrono::Duration::hours(i),
name: "test".to_string(),
space: "test".to_string(),
version: "1.0".to_string(),
metric: format!("metric{}", i),
value: j as f64,
});
batch.push(record);
}
}
let records = ServerRecords::new(batch);
let rpath = "custom";
df.write_parquet(rpath, records.clone()).await.unwrap();
let canonical_path = df.storage_root();
let data_path = object_store::path::Path::from(canonical_path);
let files = df.storage_client().list(Some(&data_path)).await.unwrap();
assert_eq!(files.len(), 3);
let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
let read_df = new_df
.get_binned_metrics(
rpath,
&0.01,
&start_utc,
&end_utc_for_test,
"test",
"test",
"1.0",
)
.await
.unwrap();
let binned_metrics = dataframe_to_custom_drift_metrics(read_df).await.unwrap();
assert_eq!(binned_metrics.metrics.len(), 3);
for file in files.iter() {
let path = Path::from(file.to_string());
df.storage_client()
.delete(&path)
.await
.expect("Failed to delete file");
}
let files = df.storage_client().list(Some(&data_path)).await.unwrap();
assert_eq!(files.len(), 0);
cleanup();
}
#[tokio::test]
async fn test_write_psi_dataframe_local() {
cleanup();
let storage_settings = ObjectStorageSettings::default();
let df = ParquetDataFrame::new(&storage_settings, &RecordType::Psi).unwrap();
let mut batch = Vec::new();
let start_utc = Utc::now();
let end_utc_for_test = start_utc + chrono::Duration::hours(3);
for i in 0..3 {
for j in 0..5 {
let record = ServerRecord::Psi(PsiServerRecord {
created_at: Utc::now() + chrono::Duration::hours(i),
name: "test".to_string(),
space: "test".to_string(),
version: "1.0".to_string(),
feature: "feature1".to_string(),
bin_id: j as usize,
bin_count: rand::rng().random_range(0..100),
});
batch.push(record);
}
}
for i in 0..3 {
for j in 0..5 {
let record = ServerRecord::Psi(PsiServerRecord {
created_at: Utc::now() + chrono::Duration::hours(i),
name: "test".to_string(),
space: "test".to_string(),
version: "1.0".to_string(),
feature: "feature2".to_string(),
bin_id: j as usize,
bin_count: rand::rng().random_range(0..100),
});
batch.push(record);
}
}
let records = ServerRecords::new(batch);
let rpath = "psi";
df.write_parquet(rpath, records.clone()).await.unwrap();
let canonical_path = df.storage_root();
let data_path = object_store::path::Path::from(canonical_path);
let files = df.storage_client().list(Some(&data_path)).await.unwrap();
assert_eq!(files.len(), 3);
let read_df = df
.get_binned_metrics(
rpath,
&0.01,
&start_utc,
&end_utc_for_test,
"test",
"test",
"1.0",
)
.await
.unwrap();
let psi_drift = dataframe_to_psi_drift_features(read_df).await.unwrap();
assert_eq!(psi_drift.len(), 2);
for file in files.iter() {
let path = Path::from(file.to_string());
df.storage_client()
.delete(&path)
.await
.expect("Failed to delete file");
}
let files = df.storage_client().list(Some(&data_path)).await.unwrap();
assert_eq!(files.len(), 0);
cleanup();
}
#[tokio::test]
async fn test_write_spc_dataframe_local() {
cleanup();
let storage_settings = ObjectStorageSettings::default();
let df = ParquetDataFrame::new(&storage_settings, &RecordType::Spc).unwrap();
let mut batch = Vec::new();
let start_utc = Utc::now();
let end_utc_for_test = start_utc + chrono::Duration::hours(3);
for i in 0..5 {
let record = ServerRecord::Spc(SpcServerRecord {
created_at: Utc::now() + chrono::Duration::hours(i),
name: "test".to_string(),
space: "test".to_string(),
version: "1.0".to_string(),
feature: "feature1".to_string(),
value: i as f64,
});
batch.push(record);
}
for i in 0..5 {
let record = ServerRecord::Spc(SpcServerRecord {
created_at: Utc::now() + chrono::Duration::hours(i),
name: "test".to_string(),
space: "test".to_string(),
version: "1.0".to_string(),
feature: "feature2".to_string(),
value: i as f64,
});
batch.push(record);
}
let records = ServerRecords::new(batch);
let rpath = "spc";
df.write_parquet(rpath, records.clone()).await.unwrap();
let canonical_path = df.storage_root();
let data_path = object_store::path::Path::from(canonical_path);
let files = df.storage_client().list(Some(&data_path)).await.unwrap();
assert_eq!(files.len(), 5);
let read_df = df
.get_binned_metrics(
rpath,
&0.01,
&start_utc,
&end_utc_for_test,
"test",
"test",
"1.0",
)
.await
.unwrap();
let _spc_drift = dataframe_to_spc_drift_features(read_df).await.unwrap();
for file in files.iter() {
let path = Path::from(file.to_string());
df.storage_client()
.delete(&path)
.await
.expect("Failed to delete file");
}
let files = df.storage_client().list(Some(&data_path)).await.unwrap();
assert_eq!(files.len(), 0);
cleanup();
}
}