taxa-core 0.1.0

taxa engine core: manifest model, formula AST→Polars Expr, bounded query generators over Polars.
//! Transform-plan ingestion — the plan-provider path.
//!
//! A producer authors a Polars `LazyFrame` transform over *named placeholder*
//! sources (each a scan of `taxa://<name>`), serializes its `DslPlan`, and ships
//! the bytes. Here we deserialize the plan, **bind each `taxa://<name>` scan leaf
//! to a named ingested frame's own plan**, and execute — yielding a viz-ready
//! `LazyFrame` that the existing bounding/treemap/series stack consumes unchanged.
//!
//! The rebind walks the *whole* plan tree (single- AND multi-input nodes — Join,
//! Union, HConcat, …), so a transform that JOINs several sources rebinds all of
//! its leaves. After rebinding we **fail-closed**: if any `taxa://` (or any other
//! unexpanded placeholder) scan leaf survives, we error before executing rather
//! than silently reading a placeholder path.

use std::collections::HashMap;
use std::sync::Arc;

use polars::prelude::*;
use polars_plan::dsl::DslPlan;

use crate::error::{Error, Result};
use crate::source::Source;

/// Scheme prefix marking a placeholder scan leaf to be late-bound.
const TAXA_SCHEME: &str = "taxa://";
const TAXA_PREFIX: &str = "taxa__";

/// If `plan` is a `Scan` leaf, return its source path string (if any).
fn scan_path(plan: &DslPlan) -> Option<String> {
    match plan {
        DslPlan::Scan { sources, .. } => sources.first_path().map(|p| p.to_str().to_string()),
        _ => None,
    }
}

/// Extract the placeholder source name from a scan path, in either form:
/// - URL form `taxa://<name>` (convenient when authoring in Rust), or
/// - filename form `…/taxa__<name>[.parquet|.csv]` — used when authoring in
///   Python, because Polars' `scan_parquet` rejects the `taxa://` URL scheme
///   eagerly but accepts an ordinary relative filename (resolved lazily, never
///   read because we rebind it away first).
fn name_from_path(path: &str) -> Option<String> {
    if let Some(n) = path.strip_prefix(TAXA_SCHEME) {
        return Some(n.to_string());
    }
    let base = path.rsplit(['/', '\\']).next().unwrap_or(path);
    let stem = base
        .strip_suffix(".parquet")
        .or_else(|| base.strip_suffix(".csv"))
        .unwrap_or(base);
    stem.strip_prefix(TAXA_PREFIX).map(|n| n.to_string())
}

/// True if a scan path is an unbound taxa placeholder (either form).
///
/// `validate_bound` now rejects *all* surviving scan leaves (real paths included,
/// see Fix 2), so this is no longer used on the hot path — it's retained as the
/// canonical placeholder predicate and exercised by the unit tests below.
#[cfg(test)]
fn is_placeholder(path: &str) -> bool {
    path.starts_with(TAXA_SCHEME)
        || path
            .rsplit(['/', '\\'])
            .next()
            .unwrap_or(path)
            .starts_with(TAXA_PREFIX)
}

/// The placeholder name of a `Scan` leaf (`taxa://<name>` or `taxa__<name>`).
///
/// Returns `None` for non-`Scan` nodes or scans that aren't taxa placeholders.
pub fn scan_name(plan: &DslPlan) -> Option<String> {
    scan_path(plan).as_deref().and_then(name_from_path)
}

/// Recursively rebind every `taxa://<name>` scan leaf to `named[name]`'s plan.
///
/// Walks ALL inputs of every node we understand — including the multi-input
/// `Join` / `Union` / `HConcat` / `ExtContext` — so a transform that joins
/// several sources rebinds each of its leaves. Variants we don't explicitly
/// recurse into are left as-is (`other => other`); any `taxa://` leaf left
/// unbound in such a subtree is caught by [`validate_bound`] before execution.
pub fn rebind_named(plan: DslPlan, named: &HashMap<String, DslPlan>) -> Result<DslPlan> {
    // A leaf placeholder: swap in the named frame's plan.
    if let Some(name) = scan_name(&plan) {
        return named.get(&name).cloned().ok_or_else(|| {
            Error::Engine(format!(
                "transform references undeclared source `taxa://{name}`"
            ))
        });
    }

    let recur_arc = |p: Arc<DslPlan>| -> Result<Arc<DslPlan>> {
        Ok(Arc::new(rebind_named((*p).clone(), named)?))
    };
    let recur_vec = |ps: Vec<DslPlan>| -> Result<Vec<DslPlan>> {
        ps.into_iter().map(|p| rebind_named(p, named)).collect()
    };

    let out = match plan {
        // ---- single-input nodes ----
        DslPlan::Filter { input, predicate } => DslPlan::Filter {
            input: recur_arc(input)?,
            predicate,
        },
        DslPlan::Cache { input, id } => DslPlan::Cache {
            input: recur_arc(input)?,
            id,
        },
        DslPlan::Select {
            expr,
            input,
            options,
        } => DslPlan::Select {
            expr,
            input: recur_arc(input)?,
            options,
        },
        DslPlan::GroupBy {
            input,
            keys,
            aggs,
            maintain_order,
            options,
            apply,
        } => DslPlan::GroupBy {
            input: recur_arc(input)?,
            keys,
            aggs,
            maintain_order,
            options,
            apply,
        },
        DslPlan::HStack {
            input,
            exprs,
            options,
        } => DslPlan::HStack {
            input: recur_arc(input)?,
            exprs,
            options,
        },
        DslPlan::Distinct { input, options } => DslPlan::Distinct {
            input: recur_arc(input)?,
            options,
        },
        DslPlan::Sort {
            input,
            by_column,
            slice,
            sort_options,
        } => DslPlan::Sort {
            input: recur_arc(input)?,
            by_column,
            slice,
            sort_options,
        },
        DslPlan::Slice { input, offset, len } => DslPlan::Slice {
            input: recur_arc(input)?,
            offset,
            len,
        },
        DslPlan::MapFunction { input, function } => DslPlan::MapFunction {
            input: recur_arc(input)?,
            function,
        },
        DslPlan::Sink { input, payload } => DslPlan::Sink {
            input: recur_arc(input)?,
            payload,
        },

        // ---- multi-input nodes (the keystone: JOIN rebinds BOTH sides) ----
        DslPlan::Join {
            input_left,
            input_right,
            left_on,
            right_on,
            predicates,
            options,
        } => DslPlan::Join {
            input_left: recur_arc(input_left)?,
            input_right: recur_arc(input_right)?,
            left_on,
            right_on,
            predicates,
            options,
        },
        DslPlan::Union { inputs, args } => DslPlan::Union {
            inputs: recur_vec(inputs)?,
            args,
        },
        DslPlan::HConcat { inputs, options } => DslPlan::HConcat {
            inputs: recur_vec(inputs)?,
            options,
        },
        DslPlan::ExtContext { input, contexts } => DslPlan::ExtContext {
            input: recur_arc(input)?,
            contexts: recur_vec(contexts)?,
        },

        // Unhandled variants (incl. already-bound DataFrameScan / non-`taxa://`
        // Scan) pass through untouched; `validate_bound` catches any surviving
        // placeholder so we never silently read one.
        other => other,
    };
    Ok(out)
}

/// Fail-closed: error if ANY `DslPlan::Scan` leaf survives in the rebound plan.
///
/// Every legitimate data source arrives via a `taxa://`-bound named source,
/// which rebinds to that source's own plan (a `DataFrameScan` for an ingested
/// frame). So after rebind there should be NO `Scan` leaf left — a surviving one
/// is either an unbound placeholder OR a real file path (e.g. a producer's
/// `pl.scan_parquet("/etc/passwd")`) that would be read on execution. We reject
/// both rather than only placeholders, so a real-path scan can never execute.
fn validate_bound(plan: &DslPlan) -> Result<()> {
    if let Some(path) = scan_path(plan) {
        return Err(Error::Engine(format!(
            "transform plan has a disallowed file scan after rebind: `{path}`. \
             All data sources must be `taxa://` named sources (bound from the \
             manifest's `sources`); direct file scans are not permitted."
        )));
    }
    for child in children(plan) {
        validate_bound(child)?;
    }
    Ok(())
}

/// Borrow the child plans of a node (for read-only validation traversal).
fn children(plan: &DslPlan) -> Vec<&DslPlan> {
    match plan {
        DslPlan::Filter { input, .. }
        | DslPlan::Cache { input, .. }
        | DslPlan::Select { input, .. }
        | DslPlan::GroupBy { input, .. }
        | DslPlan::HStack { input, .. }
        | DslPlan::Distinct { input, .. }
        | DslPlan::Sort { input, .. }
        | DslPlan::Slice { input, .. }
        | DslPlan::MapFunction { input, .. }
        | DslPlan::Sink { input, .. }
        | DslPlan::MatchToSchema { input, .. }
        | DslPlan::PipeWithSchema { input, .. } => vec![input],
        DslPlan::Join {
            input_left,
            input_right,
            ..
        } => vec![input_left, input_right],
        DslPlan::Union { inputs, .. }
        | DslPlan::HConcat { inputs, .. }
        | DslPlan::SinkMultiple { inputs } => inputs.iter().collect(),
        DslPlan::ExtContext { input, contexts } => {
            let mut v = vec![input.as_ref()];
            v.extend(contexts.iter());
            v
        }
        DslPlan::IR { dsl, .. } => vec![dsl.as_ref()],
        _ => vec![],
    }
}

/// A `Source` that serves a producer-authored *transform plan* bound to a set of
/// named ingested sources.
///
/// Composes with the rest of the engine: wrap it in a
/// [`crate::backend::FrameBackend`] and the bounding/treemap/series stack runs
/// over the transformed dataset unchanged.
pub struct TransformSource {
    sources: HashMap<String, Arc<dyn Source>>,
    plan: Vec<u8>,
}

impl TransformSource {
    /// `sources` maps each `taxa://<name>` placeholder to its ingested source;
    /// `plan_bytes` is the `serialize_versioned` output of the producer's plan.
    pub fn new(sources: HashMap<String, Arc<dyn Source>>, plan_bytes: Vec<u8>) -> Self {
        TransformSource {
            sources,
            plan: plan_bytes,
        }
    }
}

impl Source for TransformSource {
    fn frame(&self) -> Result<LazyFrame> {
        let plan = DslPlan::deserialize_versioned(&self.plan[..])
            .map_err(|e| Error::Engine(format!("failed to deserialize transform plan: {e}")))?;

        // Each named source contributes its own (already-bound) plan as the leaf.
        let mut named: HashMap<String, DslPlan> = HashMap::with_capacity(self.sources.len());
        for (name, src) in &self.sources {
            named.insert(name.clone(), src.frame()?.logical_plan);
        }

        let rebound = rebind_named(plan, &named)?;
        validate_bound(&rebound)?;
        Ok(LazyFrame::from(rebound))
    }
}

#[cfg(test)]
mod name_tests {
    use super::{is_placeholder, name_from_path};

    #[test]
    fn both_placeholder_forms_resolve() {
        // URL form (Rust authoring) and filename form (Python authoring).
        assert_eq!(
            name_from_path("taxa://snapshots").as_deref(),
            Some("snapshots")
        );
        assert_eq!(
            name_from_path("taxa__snapshots.parquet").as_deref(),
            Some("snapshots")
        );
        assert_eq!(
            name_from_path("/tmp/x/taxa__leaf_map.parquet").as_deref(),
            Some("leaf_map")
        );
        assert_eq!(
            name_from_path("taxa__canonical").as_deref(),
            Some("canonical")
        );
        // A real, non-placeholder path is not a source name.
        assert_eq!(name_from_path("/data/real.parquet"), None);
        assert!(is_placeholder("taxa://x"));
        assert!(is_placeholder("/a/taxa__x.parquet"));
        assert!(!is_placeholder("/data/real.parquet"));
    }
}