taxa-core 0.1.0

taxa engine core: manifest model, formula AST→Polars Expr, bounded query generators over Polars.
//! A dataset's backing data, surfaced to the engine as a Polars `LazyFrame`.
//!
//! This is the seam the user specifies — a Parquet archive, a CSV, or an
//! in-memory DataFrame (a "nested dict" becomes `DataFrame::new`). The engine
//! re-derives a fresh `LazyFrame` per query level; Polars' lazy pushdown means
//! re-scanning is cheap (and out-of-core for big Parquet).

use std::collections::HashSet;
use std::sync::Arc;

use polars::prelude::*;

use crate::error::Result;

/// A dataset's backing data. `Send + Sync` so the server can share one behind
/// an `Arc` across worker threads (Polars frames are thread-safe — unlike an
/// embedded DuckDB connection).
pub trait Source: Send + Sync {
    /// A fresh lazy view of the backing data.
    fn frame(&self) -> Result<LazyFrame>;

    /// Column names (resolved from the lazy schema).
    fn columns(&self) -> Result<HashSet<String>> {
        let mut lf = self.frame()?;
        let schema = lf.collect_schema()?;
        Ok(schema.iter_names().map(|s| s.to_string()).collect())
    }

    /// Ordered (column, dtype-string) pairs — drives manifest inference.
    fn schema(&self) -> Result<Vec<(String, String)>> {
        let mut lf = self.frame()?;
        let schema = lf.collect_schema()?;
        Ok(schema
            .iter()
            .map(|(n, dt)| (n.to_string(), format!("{dt}")))
            .collect())
    }
}

/// In-memory backing (Arrow DataFrame). A nested dict is loaded into one of
/// these; small datasets pay no I/O.
pub struct FrameSource {
    df: DataFrame,
}

impl FrameSource {
    pub fn new(df: DataFrame) -> Self {
        FrameSource { df }
    }
}

impl Source for FrameSource {
    fn frame(&self) -> Result<LazyFrame> {
        Ok(self.df.clone().lazy())
    }
}

/// Parquet file or glob (e.g. `archive/*.parquet`) — out-of-core via Polars' lazy scan.
pub struct ParquetSource {
    path: String,
}

impl ParquetSource {
    pub fn new(path: impl Into<String>) -> Self {
        ParquetSource { path: path.into() }
    }
}

impl Source for ParquetSource {
    fn frame(&self) -> Result<LazyFrame> {
        Ok(LazyFrame::scan_parquet(
            PlPath::new(&self.path),
            ScanArgsParquet::default(),
        )?)
    }
}

/// CSV file or glob (header row, types inferred).
pub struct CsvSource {
    path: String,
}

impl CsvSource {
    pub fn new(path: impl Into<String>) -> Self {
        CsvSource { path: path.into() }
    }
}

impl Source for CsvSource {
    fn frame(&self) -> Result<LazyFrame> {
        Ok(LazyCsvReader::new(PlPath::new(&self.path))
            .with_has_header(true)
            .finish()?)
    }
}

/// A `dims_from` enrichment source: a narrow series frame left-joined to a set
/// of per-entity dimension columns (the snapshot's axis-level columns), so the
/// engine's `series()` sees the level columns it needs to roll up branches.
///
/// `dims` is built once (deduped per entity) from the snapshot frame; the join
/// is re-applied lazily on every `frame()` call (Polars pushdown keeps it cheap).
pub struct JoinSource {
    base: Arc<dyn Source>,
    /// The deduped dimension frame: `[on] + level columns`, one row per entity.
    dims: DataFrame,
    /// The id column both frames share (the left/right join key).
    on: String,
}

impl JoinSource {
    pub fn new(base: Arc<dyn Source>, dims: DataFrame, on: impl Into<String>) -> Self {
        JoinSource {
            base,
            dims,
            on: on.into(),
        }
    }

    /// Build the `snapshot_dims` table from a snapshot source: `[on] + the given
    /// level columns` (those that exist on the snapshot), deduped per entity —
    /// then wrap `base` to left-join it. Used by the `dims_from` resolution so a
    /// producer can ship a narrow series table and derive axis levels once.
    pub fn from_snapshot(
        base: Arc<dyn Source>,
        snapshot: &dyn Source,
        on: impl Into<String>,
        dim_cols: &[String],
    ) -> Result<Self> {
        let on = on.into();
        let available = snapshot.columns()?;
        let base_cols = base.columns()?;
        let mut select: Vec<Expr> = vec![col(on.as_str())];
        let mut seen: HashSet<String> = HashSet::new();
        seen.insert(on.clone());
        for c in dim_cols {
            if c == &on {
                continue;
            }
            // Fail closed on a COLLISION: a requested dim column that exists on BOTH
            // the snapshot and the series (base) frame. Silently dropping it (the old
            // behavior) binds any filter/row_filter referencing `c` to the series'
            // TIME-VARYING value instead of the snapshot dim — a wrong, silent
            // result. Erroring names the column and points at the fix.
            if available.contains(c) && base_cols.contains(c) {
                return Err(crate::error::Error::Schema(format!(
                    "dims_from column {c:?} exists on BOTH the series frame and the \
                     snapshot — joining it would silently bind a filter/row_filter to \
                     the series' time-varying value, not the snapshot dimension. Use a \
                     dedicated level column (e.g. a bucketed `{c}_bucket`) or rename one \
                     side so the dim is unambiguous."
                )));
            }
            // Skip columns the snapshot lacks and duplicates. (Columns present only
            // on the base frame are already there — nothing to join.)
            if available.contains(c) && seen.insert(c.clone()) {
                select.push(col(c.as_str()));
            }
        }
        // Dedup on the JOIN KEY only (not whole rows): a snapshot with >1 row per
        // id but differing dims would otherwise survive the full-row unique and
        // multiply the left join's series rows.
        let dims = snapshot
            .frame()?
            .select(select)
            .unique(Some(by_name([on.as_str()], true)), UniqueKeepStrategy::Any)
            .collect()?;
        Ok(JoinSource { base, dims, on })
    }
}

impl Source for JoinSource {
    fn frame(&self) -> Result<LazyFrame> {
        let left = self.base.frame()?;
        let right = self.dims.clone().lazy();
        Ok(left.join(
            right,
            [col(self.on.as_str())],
            [col(self.on.as_str())],
            JoinArgs::new(JoinType::Left),
        ))
    }
}

/// A [`Source`] whose frame is produced by a **Rust closure** over named
/// dependency sources — the Rust-native authoring path. The closure receives a
/// `{name -> LazyFrame}` map (one entry per dependency) and returns the
/// transformed `LazyFrame`. There is NO plan serialization: the closure runs
/// in-process against taxa's own polars, so it fuses with the downstream
/// analytics and is immune to the polars-version gate.
///
/// The closure is stored behind `Arc<dyn Fn(..) + Send + Sync>` because a
/// [`Source`] is `Send + Sync` and the server runs engine calls in
/// `Send + 'static` blocking tasks.
pub struct ClosureSource {
    deps: std::collections::HashMap<String, Arc<dyn Source>>,
    #[allow(clippy::type_complexity)]
    f: Arc<
        dyn Fn(&std::collections::HashMap<String, LazyFrame>) -> PolarsResult<LazyFrame>
            + Send
            + Sync,
    >,
}

impl ClosureSource {
    /// Build from a map of dependency sources + a transform closure. The closure
    /// must be `Send + Sync + 'static` (it outlives the request and is shared
    /// across worker threads).
    pub fn new<F>(deps: std::collections::HashMap<String, Arc<dyn Source>>, f: F) -> Self
    where
        F: Fn(&std::collections::HashMap<String, LazyFrame>) -> PolarsResult<LazyFrame>
            + Send
            + Sync
            + 'static,
    {
        ClosureSource {
            deps,
            f: Arc::new(f),
        }
    }
}

impl Source for ClosureSource {
    fn frame(&self) -> Result<LazyFrame> {
        let mut inputs = std::collections::HashMap::with_capacity(self.deps.len());
        for (name, src) in &self.deps {
            inputs.insert(name.clone(), src.frame()?);
        }
        // PolarsError -> crate::Error via the #[from] on Error::Polars.
        Ok((self.f)(&inputs)?)
    }
}

/// Pick a file source by extension (parquet/csv/tsv).
pub fn file_source(path: &str) -> Box<dyn Source> {
    let ext = path.rsplit('.').next().unwrap_or("").to_lowercase();
    match ext.as_str() {
        "csv" | "tsv" | "txt" => Box::new(CsvSource::new(path)),
        _ => Box::new(ParquetSource::new(path)),
    }
}