use polars::prelude::*;
use serde::{Deserialize, Serialize};
use crate::{CanonicalColumnName, MeasurementKind, chart_hints::ChartHints};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Derivation {
pub name: CanonicalColumnName,
pub computation: Computation,
pub kind: MeasurementKind,
pub chart_hints: Option<ChartHints>,
}
impl PartialEq for Derivation {
fn eq(&self, other: &Self) -> bool {
self.name == other.name && self.computation == other.computation && self.kind == other.kind
}
}
impl Eq for Derivation {}
impl Derivation {
pub fn over_time(name: impl Into<CanonicalColumnName>, expr: TimeExpr) -> Self {
Self {
name: name.into(),
computation: Computation::OverTime(expr),
kind: MeasurementKind::Measure, chart_hints: None,
}
}
pub fn over_subjects(name: impl Into<CanonicalColumnName>, expr: OverSubjectExpr) -> Self {
let kind = match expr {
OverSubjectExpr::Rank { .. }
| OverSubjectExpr::Quantile { .. }
| OverSubjectExpr::Bucket { .. } => MeasurementKind::Categorical,
_ => MeasurementKind::Measure,
};
Self {
name: name.into(),
computation: Computation::OverSubjects(expr),
kind,
chart_hints: None,
}
}
pub fn pointwise(name: impl Into<CanonicalColumnName>, expr: PointwiseExpr) -> Self {
Self {
name: name.into(),
kind: expr.result_kind(), computation: Computation::Pointwise(expr),
chart_hints: None,
}
}
pub fn with_kind(mut self, kind: MeasurementKind) -> Self {
self.kind = kind;
self
}
pub fn with_chart_hints(mut self, hints: ChartHints) -> Self {
self.chart_hints = Some(hints);
self
}
pub fn named(mut self, name: impl Into<CanonicalColumnName>) -> Self {
self.name = name.into();
self
}
pub fn effective_chart_hints(&self) -> ChartHints {
self.chart_hints.clone().unwrap_or_else(|| match self.kind {
MeasurementKind::Categorical => ChartHints::categorical(),
_ => ChartHints::measure(),
})
}
pub fn input_columns(&self) -> Vec<&CanonicalColumnName> {
match &self.computation {
Computation::OverTime(expr) => expr.source_columns(),
Computation::OverSubjects(expr) => expr.source_columns(),
Computation::Pointwise(expr) => expr.source_columns(),
}
}
}
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Computation {
OverTime(TimeExpr),
OverSubjects(OverSubjectExpr),
Pointwise(PointwiseExpr),
}
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum TimeExpr {
Derivative {
input: CanonicalColumnName,
time_unit: TimeUnit,
},
RollingMean {
input: CanonicalColumnName,
window: usize,
},
RollingSum {
input: CanonicalColumnName,
window: usize,
},
Lag {
input: CanonicalColumnName,
periods: usize,
},
Lead {
input: CanonicalColumnName,
periods: usize,
},
CumSum { input: CanonicalColumnName },
Diff {
input: CanonicalColumnName,
periods: usize,
},
}
impl TimeExpr {
pub fn derivative(input: impl Into<CanonicalColumnName>) -> Self {
Self::Derivative {
input: input.into(),
time_unit: TimeUnit::Second,
}
}
pub fn rolling_mean(input: impl Into<CanonicalColumnName>, window: usize) -> Self {
Self::RollingMean {
input: input.into(),
window,
}
}
pub fn rolling_sum(input: impl Into<CanonicalColumnName>, window: usize) -> Self {
Self::RollingSum {
input: input.into(),
window,
}
}
pub fn lag(input: impl Into<CanonicalColumnName>, periods: usize) -> Self {
Self::Lag {
input: input.into(),
periods,
}
}
pub fn lead(input: impl Into<CanonicalColumnName>, periods: usize) -> Self {
Self::Lead {
input: input.into(),
periods,
}
}
pub fn cum_sum(input: impl Into<CanonicalColumnName>) -> Self {
Self::CumSum {
input: input.into(),
}
}
pub fn diff(input: impl Into<CanonicalColumnName>, periods: usize) -> Self {
Self::Diff {
input: input.into(),
periods,
}
}
pub fn per_hour(mut self) -> Self {
if let Self::Derivative { time_unit, .. } = &mut self {
*time_unit = TimeUnit::Hour;
}
self
}
pub fn source_columns(&self) -> Vec<&CanonicalColumnName> {
match self {
Self::Derivative { input, .. }
| Self::RollingMean { input, .. }
| Self::RollingSum { input, .. }
| Self::Lag { input, .. }
| Self::Lead { input, .. }
| Self::CumSum { input }
| Self::Diff { input, .. } => vec![input],
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TimeUnit {
Microseconds,
Second,
Minute,
Hour,
Day,
}
impl TimeUnit {
pub fn from_microseconds(&self) -> f64 {
match self {
TimeUnit::Microseconds => 1.0,
TimeUnit::Second => 1_000_000.0,
TimeUnit::Minute => 60_000_000.0,
TimeUnit::Hour => 3_600_000_000.0,
TimeUnit::Day => 86_400_000_000.0,
}
}
}
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum OverSubjectExpr {
PercentOf { input: CanonicalColumnName },
Rank {
input: CanonicalColumnName,
descending: bool,
},
ZScore { input: CanonicalColumnName },
DeviationFromMean { input: CanonicalColumnName },
Quantile {
input: CanonicalColumnName,
quantiles: u32,
},
Bucket {
input: CanonicalColumnName,
breaks: Vec<i64>,
},
}
impl OverSubjectExpr {
pub fn percent_of(input: impl Into<CanonicalColumnName>) -> Self {
Self::PercentOf {
input: input.into(),
}
}
pub fn rank(input: impl Into<CanonicalColumnName>) -> Self {
Self::Rank {
input: input.into(),
descending: true,
}
}
pub fn z_score(input: impl Into<CanonicalColumnName>) -> Self {
Self::ZScore {
input: input.into(),
}
}
pub fn deviation_from_mean(input: impl Into<CanonicalColumnName>) -> Self {
Self::DeviationFromMean {
input: input.into(),
}
}
pub fn decile(input: impl Into<CanonicalColumnName>) -> Self {
Self::Quantile {
input: input.into(),
quantiles: 10,
}
}
pub fn quartile(input: impl Into<CanonicalColumnName>) -> Self {
Self::Quantile {
input: input.into(),
quantiles: 4,
}
}
pub fn quantile(input: impl Into<CanonicalColumnName>, quantiles: u32) -> Self {
Self::Quantile {
input: input.into(),
quantiles,
}
}
pub fn bucket(input: impl Into<CanonicalColumnName>, breaks: Vec<i64>) -> Self {
Self::Bucket {
input: input.into(),
breaks,
}
}
pub fn source_columns(&self) -> Vec<&CanonicalColumnName> {
match self {
Self::PercentOf { input }
| Self::Rank { input, .. }
| Self::ZScore { input }
| Self::DeviationFromMean { input }
| Self::Quantile { input, .. }
| Self::Bucket { input, .. } => vec![input],
}
}
}
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum PointwiseExpr {
AnyOn {
inputs: Vec<CanonicalColumnName>,
},
AllOn {
inputs: Vec<CanonicalColumnName>,
},
CountNonZero {
inputs: Vec<CanonicalColumnName>,
},
Sum {
inputs: Vec<CanonicalColumnName>,
},
Mean {
inputs: Vec<CanonicalColumnName>,
},
Max {
inputs: Vec<CanonicalColumnName>,
},
Min {
inputs: Vec<CanonicalColumnName>,
},
Difference {
a: CanonicalColumnName,
b: CanonicalColumnName,
},
Ratio {
numerator: CanonicalColumnName,
denominator: CanonicalColumnName,
},
}
impl PointwiseExpr {
pub fn any_on(inputs: impl IntoCanonicalVec) -> Self {
Self::AnyOn {
inputs: inputs.into_canonical_vec(),
}
}
pub fn all_on(inputs: impl IntoCanonicalVec) -> Self {
Self::AllOn {
inputs: inputs.into_canonical_vec(),
}
}
pub fn count_non_zero(inputs: impl IntoCanonicalVec) -> Self {
Self::CountNonZero {
inputs: inputs.into_canonical_vec(),
}
}
pub fn sum(inputs: impl IntoCanonicalVec) -> Self {
Self::Sum {
inputs: inputs.into_canonical_vec(),
}
}
pub fn mean(inputs: impl IntoCanonicalVec) -> Self {
Self::Mean {
inputs: inputs.into_canonical_vec(),
}
}
pub fn max(inputs: impl IntoCanonicalVec) -> Self {
Self::Max {
inputs: inputs.into_canonical_vec(),
}
}
pub fn min(inputs: impl IntoCanonicalVec) -> Self {
Self::Min {
inputs: inputs.into_canonical_vec(),
}
}
pub fn difference(
a: impl Into<CanonicalColumnName>,
b: impl Into<CanonicalColumnName>,
) -> Self {
Self::Difference {
a: a.into(),
b: b.into(),
}
}
pub fn ratio(
numerator: impl Into<CanonicalColumnName>,
denominator: impl Into<CanonicalColumnName>,
) -> Self {
Self::Ratio {
numerator: numerator.into(),
denominator: denominator.into(),
}
}
pub fn result_kind(&self) -> MeasurementKind {
match self {
Self::AnyOn { .. } | Self::AllOn { .. } => MeasurementKind::Categorical,
Self::CountNonZero { .. } => MeasurementKind::Count,
_ => MeasurementKind::Measure,
}
}
pub fn source_columns(&self) -> Vec<&CanonicalColumnName> {
match self {
Self::AnyOn { inputs }
| Self::AllOn { inputs }
| Self::CountNonZero { inputs }
| Self::Sum { inputs }
| Self::Mean { inputs }
| Self::Max { inputs }
| Self::Min { inputs } => inputs.iter().collect(),
Self::Difference { a, b } => vec![a, b],
Self::Ratio {
numerator,
denominator,
} => vec![numerator, denominator],
}
}
pub fn to_polars_expr(&self, output_name: &str) -> PolarsResult<Expr> {
use polars::prelude::{max_horizontal, mean_horizontal, min_horizontal, sum_horizontal};
let expr = match self {
Self::AnyOn { inputs } => {
let cols: Vec<Expr> = inputs.iter().map(|c| col(c.as_str())).collect();
max_horizontal(&cols)?.gt(lit(0)).cast(DataType::Int32)
}
Self::AllOn { inputs } => {
let cols: Vec<Expr> = inputs.iter().map(|c| col(c.as_str())).collect();
min_horizontal(&cols)?.gt(lit(0)).cast(DataType::Int32)
}
Self::CountNonZero { inputs } => {
let non_zero: Vec<Expr> = inputs
.iter()
.map(|c| col(c.as_str()).neq(lit(0)).cast(DataType::UInt32))
.collect();
sum_horizontal(&non_zero, true)?
}
Self::Sum { inputs } => {
let cols: Vec<Expr> = inputs.iter().map(|c| col(c.as_str())).collect();
sum_horizontal(&cols, true)?
}
Self::Mean { inputs } => {
let cols: Vec<Expr> = inputs.iter().map(|c| col(c.as_str())).collect();
mean_horizontal(&cols, true)?
}
Self::Max { inputs } => {
let cols: Vec<Expr> = inputs.iter().map(|c| col(c.as_str())).collect();
max_horizontal(&cols)?
}
Self::Min { inputs } => {
let cols: Vec<Expr> = inputs.iter().map(|c| col(c.as_str())).collect();
min_horizontal(&cols)?
}
Self::Difference { a, b } => col(a.as_str()) - col(b.as_str()),
Self::Ratio {
numerator,
denominator,
} => col(numerator.as_str()) / col(denominator.as_str()),
};
Ok(expr.alias(output_name))
}
}
pub trait IntoCanonicalVec {
fn into_canonical_vec(self) -> Vec<CanonicalColumnName>;
}
impl IntoCanonicalVec for Vec<CanonicalColumnName> {
fn into_canonical_vec(self) -> Vec<CanonicalColumnName> {
self
}
}
impl IntoCanonicalVec for Vec<&str> {
fn into_canonical_vec(self) -> Vec<CanonicalColumnName> {
self.into_iter().map(CanonicalColumnName::from).collect()
}
}
impl IntoCanonicalVec for Vec<String> {
fn into_canonical_vec(self) -> Vec<CanonicalColumnName> {
self.into_iter().map(CanonicalColumnName::from).collect()
}
}
impl<const N: usize> IntoCanonicalVec for [&str; N] {
fn into_canonical_vec(self) -> Vec<CanonicalColumnName> {
self.into_iter().map(CanonicalColumnName::from).collect()
}
}
impl<const N: usize> IntoCanonicalVec for [String; N] {
fn into_canonical_vec(self) -> Vec<CanonicalColumnName> {
self.into_iter().map(CanonicalColumnName::from).collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_combine_units_with_pointwise() {
let derivation = Derivation::pointwise(
"any_engine_running",
PointwiseExpr::any_on(["engine_1", "engine_2", "engine_3"]),
);
assert_eq!(
derivation.name,
CanonicalColumnName::from("any_engine_running")
);
assert_eq!(derivation.kind, MeasurementKind::Categorical);
let inputs = derivation.input_columns();
assert!(inputs.contains(&&CanonicalColumnName::from("engine_1")));
assert!(inputs.contains(&&CanonicalColumnName::from("engine_2")));
assert!(inputs.contains(&&CanonicalColumnName::from("engine_3")));
}
#[test]
fn test_natural_transformation_deciles() {
let derivation =
Derivation::over_subjects("sales_decile", OverSubjectExpr::decile("sales"));
assert_eq!(derivation.kind, MeasurementKind::Categorical);
assert_eq!(
derivation.input_columns(),
vec![&CanonicalColumnName::from("sales")]
);
}
#[test]
fn test_fluent_building() {
let derivation = Derivation::over_time("fuel_rate", TimeExpr::derivative("fuel"))
.with_kind(MeasurementKind::Measure)
.named("fuel_consumption_rate")
.with_chart_hints(ChartHints::measure().label("Fuel Rate (L/hr)"));
assert_eq!(
derivation.name,
CanonicalColumnName::from("fuel_consumption_rate")
);
assert!(derivation.chart_hints.is_some());
}
}