use crate::sql::error::SqlError;
use crate::sql::query::Queries;
use crate::sql::schema::SpcFeatureResult;
use crate::sql::utils::split_custom_interval;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use itertools::multiunzip;
use scouter_dataframe::parquet::{dataframe_to_spc_drift_features, ParquetDataFrame};
use scouter_settings::ObjectStorageSettings;
use scouter_types::{
spc::{SpcDriftFeature, SpcDriftFeatures},
DriftRequest, RecordType, SpcRecord,
};
use sqlx::{postgres::PgQueryResult, Pool, Postgres, Row};
use std::collections::BTreeMap;
use tracing::{debug, error, instrument};
#[async_trait]
pub trait SpcSqlLogic {
async fn insert_spc_drift_records_batch(
pool: &Pool<Postgres>,
records: &[SpcRecord],
entity_id: &i32,
) -> Result<PgQueryResult, SqlError> {
if records.is_empty() {
return Err(SqlError::EmptyBatchError);
}
let query = Queries::InsertSpcDriftRecordBatch.get_query();
let (created_ats, entity_ids, features, values): (
Vec<DateTime<Utc>>,
Vec<i32>,
Vec<&str>,
Vec<f64>,
) = multiunzip(
records
.iter()
.map(|r| (r.created_at, entity_id, r.feature.as_str(), r.value)),
);
sqlx::query(query)
.bind(created_ats)
.bind(entity_ids)
.bind(features)
.bind(values)
.execute(pool)
.await
.map_err(SqlError::SqlxError)
}
async fn get_spc_features(
pool: &Pool<Postgres>,
entity_id: &i32,
) -> Result<Vec<String>, SqlError> {
let query = Queries::GetSpcFeatures.get_query();
Ok(sqlx::query(query)
.bind(entity_id)
.fetch_all(pool)
.await
.inspect_err(|e| {
error!("Error fetching SPC features: {:?}", e);
})
.map(|result| {
result
.iter()
.map(|row| row.get("feature"))
.collect::<Vec<String>>()
})?)
}
async fn get_spc_drift_records(
pool: &Pool<Postgres>,
limit_datetime: &DateTime<Utc>,
features_to_monitor: &[String],
entity_id: &i32,
) -> Result<SpcDriftFeatures, SqlError> {
let mut features = Self::get_spc_features(pool, entity_id).await?;
if !features_to_monitor.is_empty() {
features.retain(|feature| features_to_monitor.contains(feature));
}
let query = Queries::GetSpcFeatureValues.get_query();
let records: Vec<SpcFeatureResult> = sqlx::query_as(query)
.bind(limit_datetime)
.bind(entity_id)
.bind(features)
.fetch_all(pool)
.await
.inspect_err(|e| {
error!("Error fetching SPC drift records: {:?}", e);
})?;
let feature_drift = records
.into_iter()
.map(|record| {
let feature = SpcDriftFeature {
created_at: record.created_at,
values: record.values,
};
(record.feature.clone(), feature)
})
.collect::<BTreeMap<String, SpcDriftFeature>>();
Ok(SpcDriftFeatures {
features: feature_drift,
})
}
async fn get_records(
pool: &Pool<Postgres>,
params: &DriftRequest,
start_dt: DateTime<Utc>,
end_dt: DateTime<Utc>,
entity_id: &i32,
) -> Result<SpcDriftFeatures, SqlError> {
let minutes = end_dt.signed_duration_since(start_dt).num_minutes() as f64;
let bin = minutes / params.max_data_points as f64;
let query = Queries::GetBinnedSpcFeatureValues.get_query();
let records: Vec<SpcFeatureResult> = sqlx::query_as(query)
.bind(bin)
.bind(start_dt)
.bind(end_dt)
.bind(entity_id)
.fetch_all(pool)
.await?;
let feature_drift = records
.into_iter()
.map(|record| {
let feature = SpcDriftFeature {
created_at: record.created_at,
values: record.values,
};
(record.feature.clone(), feature)
})
.collect::<BTreeMap<String, SpcDriftFeature>>();
Ok(SpcDriftFeatures {
features: feature_drift,
})
}
fn merge_feature_results(
results: SpcDriftFeatures,
map: &mut SpcDriftFeatures,
) -> Result<(), SqlError> {
for (feature_name, feature) in results.features {
let feature_clone = feature.clone();
map.features
.entry(feature_name)
.and_modify(|existing| {
existing.created_at.extend(feature_clone.created_at);
existing.values.extend(feature_clone.values);
})
.or_insert(feature);
}
Ok(())
}
async fn get_archived_records(
params: &DriftRequest,
begin: DateTime<Utc>,
end: DateTime<Utc>,
minutes: i32,
storage_settings: &ObjectStorageSettings,
entity_id: &i32,
) -> Result<SpcDriftFeatures, SqlError> {
let path = format!("{}/spc", params.uid);
let bin = minutes as f64 / params.max_data_points as f64;
let archived_df = ParquetDataFrame::new(storage_settings, &RecordType::Spc)?
.get_binned_metrics(&path, &bin, &begin, &end, entity_id)
.await?;
Ok(dataframe_to_spc_drift_features(archived_df).await?)
}
#[instrument(skip_all)]
async fn get_binned_spc_drift_records(
pool: &Pool<Postgres>,
params: &DriftRequest,
retention_period: &i32,
storage_settings: &ObjectStorageSettings,
entity_id: &i32,
) -> Result<SpcDriftFeatures, SqlError> {
debug!("Getting binned SPC drift records for {:?}", params);
if !params.has_custom_interval() {
debug!("No custom interval provided, using default");
let (begin_utc, end_utc) = params.time_interval.to_begin_end_times()?;
return Self::get_records(pool, params, begin_utc, end_utc, entity_id).await;
}
debug!("Custom interval provided, using custom interval");
let interval = params.clone().to_custom_interval().unwrap();
let timestamps = split_custom_interval(interval.begin, interval.end, retention_period)?;
let mut spc_feature_map = SpcDriftFeatures::default();
if let Some((active_begin, active_end)) = timestamps.active_range {
let current_results =
Self::get_records(pool, params, active_begin, active_end, entity_id).await?;
Self::merge_feature_results(current_results, &mut spc_feature_map)?;
}
if let Some((archive_begin, archive_end)) = timestamps.archived_range {
if let Some(archived_minutes) = timestamps.archived_minutes {
let archived_results = Self::get_archived_records(
params,
archive_begin,
archive_end,
archived_minutes,
storage_settings,
entity_id,
)
.await?;
Self::merge_feature_results(archived_results, &mut spc_feature_map)?;
}
}
Ok(spc_feature_map)
}
}