taxa-core 0.1.0

taxa engine core: manifest model, formula AST→Polars Expr, bounded query generators over Polars.
//! Transform-plan ingestion round-trips (pure Rust: author + serialize the plan,
//! then deserialize + rebind + execute). Proves a `taxa://<name>` scan leaf is
//! late-bound to an ingested frame across BOTH single-input and multi-input
//! (Join) plans, and that an undeclared placeholder fails loudly.

use std::collections::HashMap;
use std::sync::Arc;

use polars::prelude::*;
use polars_plan::dsl::PlanSerializationContext;
use taxa_core::source::{FrameSource, Source};
use taxa_core::TransformSource;

/// A lazy `taxa://<name>` placeholder scan — never touches disk (lazy until
/// collect, and we rebind it away before any collect happens).
fn placeholder(name: &str) -> LazyFrame {
    LazyFrame::scan_parquet(
        PlPath::new(&format!("taxa://{name}")),
        ScanArgsParquet::default(),
    )
    .expect("scan_parquet should build a lazy placeholder without reading")
}

/// Serialize a producer-authored plan to the versioned wire format.
fn serialize(lf: LazyFrame) -> Vec<u8> {
    let mut buf = Vec::new();
    lf.logical_plan
        .serialize_versioned(&mut buf, PlanSerializationContext::default())
        .expect("serialize_versioned");
    buf
}

#[test]
fn join_rebinds_both_sources() {
    // Producer authors: scan taxa://left JOIN scan taxa://right on `key`,
    // then select key + both value columns.
    let plan = placeholder("left")
        .join(
            placeholder("right"),
            [col("key")],
            [col("key")],
            JoinArgs::new(JoinType::Inner),
        )
        .select([col("key"), col("lval"), col("rval")])
        .sort(["key"], Default::default());
    let bytes = serialize(plan);

    let left = df![
        "key" => ["a", "b", "c"],
        "lval" => [1_i64, 2, 3],
    ]
    .unwrap();
    let right = df![
        "key" => ["a", "b", "c"],
        "rval" => [10_i64, 20, 30],
    ]
    .unwrap();

    let mut sources: HashMap<String, Arc<dyn Source>> = HashMap::new();
    sources.insert("left".into(), Arc::new(FrameSource::new(left)));
    sources.insert("right".into(), Arc::new(FrameSource::new(right)));

    let src = TransformSource::new(sources, bytes);
    let out = src.frame().unwrap().collect().unwrap();

    let expected = df![
        "key" => ["a", "b", "c"],
        "lval" => [1_i64, 2, 3],
        "rval" => [10_i64, 20, 30],
    ]
    .unwrap();
    assert_eq!(out, expected, "both Join leaves must rebind + execute");
}

#[test]
fn single_source_filter_groupby() {
    // scan taxa://x, filter px > 1, group_by sym -> sum(px).
    let plan = placeholder("x")
        .filter(col("px").gt(lit(1.0_f64)))
        .group_by([col("sym")])
        .agg([col("px").sum().alias("total")])
        .sort(["sym"], Default::default());
    let bytes = serialize(plan);

    let x = df![
        "sym" => ["AAPL", "MSFT", "AAPL", "GOOG", "MSFT"],
        "px"  => [1.0_f64, 2.0, 3.0, 4.0, 5.0],
    ]
    .unwrap();

    let mut sources: HashMap<String, Arc<dyn Source>> = HashMap::new();
    sources.insert("x".into(), Arc::new(FrameSource::new(x)));

    let src = TransformSource::new(sources, bytes);
    let out = src.frame().unwrap().collect().unwrap();

    // px>1 keeps: MSFT 2, AAPL 3, GOOG 4, MSFT 5 -> AAPL 3, GOOG 4, MSFT 7
    let expected = df![
        "sym" => ["AAPL", "GOOG", "MSFT"],
        "total" => [3.0_f64, 4.0, 7.0],
    ]
    .unwrap();
    assert_eq!(out, expected);
}

#[test]
fn real_path_scan_is_rejected() {
    // A producer plan that scans a REAL file path (not a taxa:// placeholder).
    // It survives rebind untouched (no placeholder to swap), so `validate_bound`
    // must reject it before any read happens — all sources must be taxa:// named.
    let plan = LazyFrame::scan_parquet(PlPath::new("/etc/passwd"), ScanArgsParquet::default())
        .expect("lazy scan builds without reading")
        .select([col("*")]);
    let bytes = serialize(plan);

    // Even with a (mismatched) named source declared, the real-path scan leaf
    // remains and must be rejected.
    let mut sources: HashMap<String, Arc<dyn Source>> = HashMap::new();
    sources.insert(
        "present".into(),
        Arc::new(FrameSource::new(df!["k" => [1_i64]].unwrap())),
    );

    let src = TransformSource::new(sources, bytes);
    let msg = match src.frame() {
        Ok(_) => panic!("a real-path file scan must be rejected, not executed"),
        Err(e) => format!("{e}"),
    };
    assert!(
        msg.contains("/etc/passwd"),
        "error should name the disallowed path, got: {msg}"
    );
    assert!(
        msg.contains("taxa://"),
        "error should say sources must be taxa:// named, got: {msg}"
    );
}

#[test]
fn unknown_source_errors() {
    let plan = placeholder("missing").select([col("*")]);
    let bytes = serialize(plan);

    // Declare a *different* name; "missing" is undeclared.
    let mut sources: HashMap<String, Arc<dyn Source>> = HashMap::new();
    sources.insert(
        "present".into(),
        Arc::new(FrameSource::new(df!["k" => [1_i64]].unwrap())),
    );

    let src = TransformSource::new(sources, bytes);
    let msg = match src.frame() {
        Ok(_) => panic!("undeclared placeholder must error"),
        Err(e) => format!("{e}"),
    };
    assert!(
        msg.contains("missing"),
        "error should name the undeclared source, got: {msg}"
    );
}