etl-unit 0.1.0

Semantic data model for ETL units — qualities and measurements over subjects and time. Built on Polars.
//! Return shape for subset-pipeline stages.
//!
//! Every optional stage (interval aggregation, group_by, …) is written
//! as a "total function" — it accepts an `Option<&Params>` and returns
//! a [`StageOutcome`]. When params are `None`, the stage no-ops and
//! returns [`StageOutcome::passthrough`].
//!
//! The caller merges each stage's outcome into its accumulators with
//! `.extend()` calls, so the main orchestration flow contains no
//! `if let Some` branches per stage. The pattern is:
//!
//! ```ignore
//! let outcome = interval::apply_interval(data, request.report_interval.as_ref(), …)?;
//! data = outcome.data;
//! interval_stats.extend(outcome.stats);
//! stage_trace.extend(outcome.diags);
//!
//! let outcome = group::apply_group_by(data, request.group_by.as_ref(), …)?;
//! data = outcome.data;
//! group_stats.extend(outcome.stats);
//! stage_trace.extend(outcome.diags);
//! ```
//!
//! `extend` from empty collections is a no-op, so skipped stages cost
//! nothing at the call site. The whole pipeline reads as a linear
//! sequence of reductions.

use polars::prelude::DataFrame;

use super::stages::StageDiag;

/// The return shape for a subset-pipeline stage.
///
/// - `data` — the transformed DataFrame (or the input unchanged if the
///   stage no-opped).
/// - `stats` — per-cell diagnostics specific to the stage (empty when
///   the stage didn't run). Each stage defines its own `S` type
///   (e.g., `IntervalStats`, `GroupStats`).
/// - `diags` — entries to append to the subset's stage trace (empty
///   when the stage didn't run).
#[derive(Debug, Clone)]
pub struct StageOutcome<S> {
    pub data: DataFrame,
    pub stats: Vec<S>,
    pub diags: Vec<StageDiag>,
}

impl<S> StageOutcome<S> {
    /// Construct a passthrough outcome — the stage did nothing, so
    /// `data` is whatever came in, and `stats`/`diags` are empty.
    pub fn passthrough(data: DataFrame) -> Self {
        Self {
            data,
            stats: Vec::new(),
            diags: Vec::new(),
        }
    }

    /// Construct an outcome for a stage that ran. Caller supplies the
    /// stats and the single stage-diag entry. Convenience for the
    /// common case where a stage emits exactly one trace entry.
    pub fn executed(data: DataFrame, stats: Vec<S>, diag: StageDiag) -> Self {
        Self {
            data,
            stats,
            diags: vec![diag],
        }
    }
}