pub mod meta;
pub mod outcome;
pub mod stages;
pub mod subset_executor;
pub use outcome::StageOutcome;
use std::marker::PhantomData;
use chrono::{DateTime, Utc};
pub use meta::*;
use polars::prelude::DataFrame;
use serde::{Deserialize, Serialize};
pub use subset_executor::SubsetExecutor;
use crate::{CanonicalColumnName, MeasurementKind, chart_hints::ChartHints};
#[derive(Debug, Clone, Copy)]
pub struct Raw;
#[derive(Debug, Clone, Copy)]
pub struct Processed;
mod sealed {
pub trait Sealed {}
impl Sealed for super::Raw {}
impl Sealed for super::Processed {}
}
pub trait SubsetMode: sealed::Sealed {}
impl SubsetMode for Raw {}
impl SubsetMode for Processed {}
#[derive(Debug, Clone)]
pub struct SubsetUniverse<M: SubsetMode = Processed> {
pub data: DataFrame,
pub measurements: Vec<MeasurementMeta>,
pub qualities: Vec<QualityMeta>,
pub info: SubsetInfo,
pub(crate) _mode: PhantomData<M>,
}
impl<M: SubsetMode> SubsetUniverse<M> {
pub fn new_in_mode(
data: DataFrame,
measurements: Vec<MeasurementMeta>,
qualities: Vec<QualityMeta>,
info: SubsetInfo,
) -> Self {
Self {
data,
measurements,
qualities,
info,
_mode: PhantomData,
}
}
pub fn dataframe(&self) -> &DataFrame {
&self.data
}
pub fn into_dataframe(self) -> DataFrame {
self.data
}
pub fn with_dataframe(mut self, data: DataFrame) -> Self {
self.info.row_count = data.height();
self.data = data;
self
}
pub fn get_measurement(&self, column: &str) -> Option<&MeasurementMeta> {
self.measurements.iter().find(|m| m.column == column.into())
}
pub fn get_quality(&self, column: &str) -> Option<&QualityMeta> {
self.qualities.iter().find(|q| q.column == column.into())
}
pub fn measurement_columns(&self) -> Vec<&str> {
self.measurements
.iter()
.map(|m| m.column.as_str())
.collect()
}
pub fn quality_columns(&self) -> Vec<&str> {
self.qualities.iter().map(|q| q.column.as_str()).collect()
}
pub fn has_measurements(&self) -> bool {
!self.measurements.is_empty()
}
pub fn has_qualities(&self) -> bool {
!self.qualities.is_empty()
}
pub fn time_column(&self) -> Option<&str> {
self.info.time_column.as_deref()
}
pub fn subject_column(&self) -> &str {
&self.info.subject_column
}
}
impl SubsetUniverse<Processed> {
pub fn new_processed(
data: DataFrame,
measurements: Vec<MeasurementMeta>,
qualities: Vec<QualityMeta>,
info: SubsetInfo,
) -> Self {
Self::new_in_mode(data, measurements, qualities, info)
}
pub fn new(
data: DataFrame,
measurements: Vec<MeasurementMeta>,
qualities: Vec<QualityMeta>,
info: SubsetInfo,
) -> Self {
Self::new_processed(data, measurements, qualities, info)
}
}
impl SubsetUniverse<Raw> {
pub fn new_raw(
data: DataFrame,
measurements: Vec<MeasurementMeta>,
qualities: Vec<QualityMeta>,
info: SubsetInfo,
) -> Self {
Self::new_in_mode(data, measurements, qualities, info)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MeasurementMeta {
pub column: CanonicalColumnName,
pub kind: MeasurementKind,
pub chart_hints: ChartHints,
#[serde(default)]
pub has_null_value: bool,
}
impl MeasurementMeta {
pub fn new<T: Into<CanonicalColumnName>>(column: T, kind: MeasurementKind) -> Self {
let kind_hints = match kind {
MeasurementKind::Categorical => ChartHints::categorical(),
_ => ChartHints::measure(),
};
Self {
column: column.into(),
kind,
chart_hints: kind_hints,
has_null_value: false,
}
}
pub fn with_chart_hints(mut self, hints: ChartHints) -> Self {
self.chart_hints = hints;
self
}
pub fn with_null_value_configured(mut self, has_null_value: bool) -> Self {
self.has_null_value = has_null_value;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QualityMeta {
pub column: CanonicalColumnName,
pub chart_hints: ChartHints,
}
impl QualityMeta {
pub fn new(column: CanonicalColumnName) -> Self {
Self {
column,
chart_hints: ChartHints::quality(),
}
}
pub fn with_chart_hints(mut self, hints: ChartHints) -> Self {
self.chart_hints = hints;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubsetInfo {
pub schema_name: String,
pub subject_column: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub time_column: Option<String>,
pub row_count: usize,
pub subject_count: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub time_range: Option<(DateTime<Utc>, DateTime<Utc>)>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub sources: Vec<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub stage_trace: Vec<stages::StageDiag>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub interval_stats: Vec<crate::interval::IntervalStats>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub group_stats: Vec<crate::group::GroupStats>,
}
impl SubsetInfo {
pub fn new(schema_name: impl Into<String>, subject_column: impl Into<String>) -> Self {
Self {
schema_name: schema_name.into(),
subject_column: subject_column.into(),
time_column: None,
row_count: 0,
subject_count: 0,
time_range: None,
sources: Vec::new(),
stage_trace: Vec::new(),
interval_stats: Vec::new(),
group_stats: Vec::new(),
}
}
pub fn with_time_column(mut self, time_column: impl Into<String>) -> Self {
self.time_column = Some(time_column.into());
self
}
pub fn with_row_count(mut self, count: usize) -> Self {
self.row_count = count;
self
}
pub fn with_subject_count(mut self, count: usize) -> Self {
self.subject_count = count;
self
}
pub fn with_time_range(mut self, start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
self.time_range = Some((start, end));
self
}
pub fn with_sources(mut self, sources: Vec<String>) -> Self {
self.sources = sources;
self
}
pub fn add_source(mut self, source: impl Into<String>) -> Self {
self.sources.push(source.into());
self
}
}
#[cfg(test)]
mod tests {
use polars::prelude::*;
use super::*;
use crate::chart_hints::AxisId;
#[test]
fn test_measurement_meta() {
let meta = MeasurementMeta::new("sump_ft", MeasurementKind::Measure);
assert_eq!(meta.column, "sump_ft".into());
assert_eq!(meta.kind, MeasurementKind::Measure);
assert!(!meta.chart_hints.stepped);
}
#[test]
fn test_measurement_meta_categorical() {
let meta = MeasurementMeta::new("engine_1", MeasurementKind::Categorical);
assert!(meta.chart_hints.stepped);
assert_eq!(meta.chart_hints.axis, AxisId::Y2);
}
#[test]
fn test_measurement_meta_custom_hints() {
let meta = MeasurementMeta::new("fuel_pct", MeasurementKind::Measure)
.with_chart_hints(ChartHints::new().axis(AxisId::Y1).label("Fuel Level"));
assert_eq!(meta.chart_hints.axis, AxisId::Y1);
assert_eq!(meta.chart_hints.label, Some("Fuel Level".into()));
}
#[test]
fn test_quality_meta() {
let meta = QualityMeta::new("region".into());
assert_eq!(meta.column, "region".into());
assert_eq!(
meta.chart_hints.chart_type,
crate::chart_hints::ChartType::Bar
);
assert_eq!(meta.chart_hints.index, crate::chart_hints::Index::Subject);
}
#[test]
fn test_subset_info() {
let info = SubsetInfo::new("pump_telemetry", "station_id")
.with_time_column("timestamp")
.with_row_count(100)
.with_subject_count(5)
.with_sources(vec!["scada".into()]);
assert_eq!(info.schema_name, "pump_telemetry");
assert_eq!(info.subject_column, "station_id");
assert_eq!(info.time_column, Some("timestamp".into()));
assert_eq!(info.row_count, 100);
assert_eq!(info.subject_count, 5);
assert_eq!(info.sources, vec!["scada"]);
}
#[test]
fn test_subset_universe() {
let df = df! {
"station_id" => [1, 1, 2, 2],
"timestamp" => [100i64, 200, 100, 200],
"sump_ft" => [1.0, 2.0, 3.0, 4.0],
"engine_1" => [0, 1, 1, 0]
}
.unwrap();
let measurements = vec![
MeasurementMeta::new("sump_ft", MeasurementKind::Measure),
MeasurementMeta::new("engine_1", MeasurementKind::Categorical),
];
let info = SubsetInfo::new("test", "station_id")
.with_time_column("timestamp")
.with_row_count(4)
.with_subject_count(2);
let universe = SubsetUniverse::new(df, measurements, vec![], info);
assert_eq!(universe.measurement_columns(), vec!["sump_ft", "engine_1"]);
assert!(universe.has_measurements());
assert!(!universe.has_qualities());
assert_eq!(universe.time_column(), Some("timestamp"));
assert_eq!(universe.subject_column(), "station_id");
let sump = universe.get_measurement("sump_ft").unwrap();
assert_eq!(sump.kind, MeasurementKind::Measure);
let engine = universe.get_measurement("engine_1").unwrap();
assert!(engine.chart_hints.stepped);
}
}