vertexrs 0.1.0

High-performance incremental computation engine with compile-time DAG pipelines
Documentation
//! Pipeline — a named, heterogeneous computation unit.
//!
//! # Design
//! A [`Pipeline`] is a `Frame → Frame` transformation: callers [`push`] a
//! source [`Frame`], call [`compute`], and read results via [`output`].
//!
//! The concrete computation logic is generated by the [`pipeline!`] macro,
//! which produces an anonymous struct that implements the [`PipelineImpl`]
//! sealed trait.  The [`Pipeline`] wrapper erases that concrete type behind a
//! `Box<dyn PipelineImpl>`, giving callers a stable, nameable type.
//!
//! # Nested pipelines
//! A `Pipeline` can be embedded inside another `pipeline!` block as a named
//! node.  From the parent's perspective it is completely opaque — its internal
//! nodes are invisible.  Only the columns declared in its `output!(...)` clause
//! are accessible downstream.  Full nested-pipeline support is Phase 2.2.1.
//!
//! [`push`]: Pipeline::push
//! [`compute`]: Pipeline::compute
//! [`output`]: Pipeline::output
//! [`pipeline!`]: vertexrs_macro::pipeline

use crate::Frame;

// ── FailureMode ───────────────────────────────────────────────────────────────

/// Controls how a kernel panic is handled during pipeline execution.
///
/// Extends [`executor::FailureMode`](crate::executor::FailureMode) with the
/// pipeline-only [`Isolate`](FailureMode::Isolate) variant.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum FailureMode {
    /// Mark all output columns as all-null, collect a warning, and continue
    /// execution in the parent pipeline.
    #[default]
    Soft,
    /// Propagate the error immediately, halting the entire compute cycle.
    Hard,
    /// *Nested-pipeline only.* Isolate the failure: write NA to all of this
    /// pipeline's output columns and continue the parent pipeline normally.
    /// The error is recorded in [`Pipeline::errors`] but not returned from
    /// the parent's [`compute`](Pipeline::compute).
    Isolate,
}

// ── PipelineError ─────────────────────────────────────────────────────────────

/// An error returned by [`Pipeline::compute`].
#[derive(Debug, Clone)]
pub enum PipelineError {
    /// A required source column was not supplied via [`Pipeline::push`] before
    /// [`compute`](Pipeline::compute) was called.
    MissingSource(&'static str),
    /// A node kernel panicked during execution.
    KernelPanic(String),
}

impl std::fmt::Display for PipelineError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            PipelineError::MissingSource(name) => {
                write!(f, "source '{}' was not pushed before compute()", name)
            }
            PipelineError::KernelPanic(msg) => {
                write!(f, "kernel panicked: {}", msg)
            }
        }
    }
}

impl std::error::Error for PipelineError {}

// ── PipelineSettings ──────────────────────────────────────────────────────────

/// Per-pipeline execution policy.
///
/// Passed to the `pipeline!` macro via an optional `settings { }` block
/// (Phase 2.2.1).  The default values are appropriate for most use-cases.
#[derive(Debug, Clone)]
pub struct PipelineSettings {
    /// How kernel panics are handled.
    pub failure_mode: FailureMode,
    /// Emit a warning if the null fraction in any output column exceeds this
    /// value.  Range `0.0` (warn on any NA) to `1.0` (never warn).
    pub na_threshold: f64,
    /// When `true`, each compute cycle runs every kernel twice and compares
    /// outputs.  A mismatch indicates a non-deterministic (impure) kernel.
    pub purity_check: bool,
}

impl Default for PipelineSettings {
    fn default() -> Self {
        Self { failure_mode: FailureMode::Soft, na_threshold: 0.5, purity_check: false }
    }
}

// ── PipelineImpl ──────────────────────────────────────────────────────────────

/// Internal trait implemented by macro-generated pipeline state structs.
///
/// This trait is sealed — it is only implemented by code produced by the
/// [`pipeline!`] macro and must not be implemented manually.
///
/// [`pipeline!`]: vertexrs_macro::pipeline
#[doc(hidden)]
pub trait PipelineImpl: Send {
    /// Copies source columns from `frame` into the pipeline's internal state.
    /// Columns in `frame` whose names are not declared sources are silently
    /// ignored.  Columns that are declared but absent in `frame` retain their
    /// previous value (if any).
    fn push_sources(&mut self, frame: &Frame);

    /// Executes all nodes in declaration order, building the output [`Frame`].
    fn run(&mut self) -> Result<(), PipelineError>;

    /// Returns a reference to the output [`Frame`] produced by the most recent
    /// [`run`](PipelineImpl::run) call.  Panics if `run` has never been called.
    fn output(&self) -> &Frame;

    /// Drains and returns all warnings collected during the last
    /// [`run`](PipelineImpl::run) call.
    fn drain_warnings(&mut self) -> Vec<String>;

    /// Drains errors from nested-pipeline `Isolate`-mode failures recorded
    /// during the last [`run`](PipelineImpl::run) call.
    fn drain_isolated_errors(&mut self) -> Vec<PipelineError>;
}

// ── Pipeline ──────────────────────────────────────────────────────────────────

/// A heterogeneous, named computation pipeline.
///
/// Created by the [`pipeline!`] macro.  Stores source columns, executes a
/// sequence of derived [`Node`](crate::Node) transformations, and exposes a
/// subset of those nodes as an output [`Frame`].
///
/// # Example
/// ```rust,ignore
/// use vertexrs::{Frame, Node, pipeline};
///
/// let mut p = pipeline! {
///     source!(price: f64);
///     node!(tax   = price.row(|x| x * 0.2));
///     node!(total = price.row(|x| x + tax));
///     output!(tax, total)
/// };
///
/// p.push(Frame::new().append(Node::from_data("price", vec![10.0_f64, 20.0])));
/// p.compute().unwrap();
/// assert_eq!(p.output().get::<f64>("total").unwrap(), &[12.0, 24.0]);
/// ```
pub struct Pipeline {
    inner: Box<dyn PipelineImpl>,
    /// Errors from isolated nested-pipeline failures (Phase 2.2.1).
    isolated_errors: Vec<PipelineError>,
}

impl Pipeline {
    /// Creates a new pipeline wrapping the given implementation.
    ///
    /// Called by the [`pipeline!`] macro; do not call this directly.
    #[doc(hidden)]
    pub fn new(inner: Box<dyn PipelineImpl>) -> Self {
        Self { inner, isolated_errors: Vec::new() }
    }

    /// Copies source columns from `frame` into the pipeline.
    ///
    /// Only columns whose names match declared `source!(name: T)` declarations
    /// are consumed.  Other columns are ignored.  Source columns not present in
    /// `frame` retain their value from the previous push (if any).
    pub fn push(&mut self, frame: &Frame) -> &mut Self {
        self.inner.push_sources(frame);
        self
    }

    /// Executes all nodes in declaration order.
    ///
    /// Returns `Err(PipelineError::MissingSource)` if any required source was
    /// never pushed.
    pub fn compute(&mut self) -> Result<(), PipelineError> {
        self.isolated_errors.clear();
        let result = self.inner.run();
        self.isolated_errors.extend(self.inner.drain_isolated_errors());
        result
    }

    /// Returns the output [`Frame`] from the most recent [`compute`] call.
    ///
    /// The frame contains exactly the columns declared in `output!(...)`.
    ///
    /// # Panics
    /// Panics if [`compute`] has never been called successfully.
    ///
    /// [`compute`]: Pipeline::compute
    pub fn output(&self) -> &Frame {
        self.inner.output()
    }

    /// Drains and returns all warnings from the last [`compute`] cycle.
    pub fn drain_warnings(&mut self) -> Vec<String> {
        self.inner.drain_warnings()
    }

    /// Returns errors from isolated nested-pipeline failures (Phase 2.2.1).
    pub fn errors(&self) -> &[PipelineError] {
        &self.isolated_errors
    }
}