taxa 0.1.0

Author a taxa visualization dataset in Rust: named sources + a Rust transform + a manifest, served by the taxa engine.
//! **Author a taxa dataset in Rust.**
//!
//! The Rust on-ramp: declare named sources + a transform written in real
//! polars-rust + a manifest, and serve the full GUI. Because the transform runs
//! in-process against taxa's **own** polars (one `Cargo.lock`), there is no plan
//! serialization, no version gate, and no matched-wheel bootstrap — and the
//! transform fuses with the bounded analytics.
//!
//! ```no_run
//! use taxa::{App, SqlSource};
//! use taxa::polars::prelude::*;
//!
//! # fn main() -> taxa::Result<()> {
//! let dsn = "host=/tmp dbname=mydb";
//! App::from_manifest("manifest.json")?
//!     .source("items", SqlSource::connect(dsn, "SELECT id, name, kind, value FROM items")?)
//!     .transform("main", ["items"], |s| {
//!         s["items"].clone().filter(col("value").gt(lit(0)))
//!     })
//!     .serve("127.0.0.1", 8000)
//! # }
//! ```
//!
//! **Use `taxa::polars`** (re-exported below) — do NOT add your own `polars`
//! dependency; a second polars is a distinct, type-incompatible crate. Need a
//! polars feature the engine doesn't enable? turn on the matching feature on the
//! `taxa` dependency (see this crate's `[features]`).

use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;

use polars::prelude::LazyFrame;

// Re-exports so a dataset crate needs only `taxa` (and never a second polars).
pub use polars;
pub use taxa_core::{
    ClosureSource, CsvSource, Dataset, Error, FrameSource, JoinSource, Loaded, ParquetSource,
    Result, Source,
};
pub use taxa_sql::SqlSource;

/// A user transform: `{dep-name -> LazyFrame}` → the frame's `LazyFrame`. Stored
/// behind an `Arc` (shared across worker threads; the engine is `Send + Sync`).
type TransformFn = Arc<dyn Fn(&HashMap<String, LazyFrame>) -> LazyFrame + Send + Sync>;

/// A dataset to serve: a manifest plus how to build each frame's [`Source`] —
/// a Rust `transform` over named sources, a `frame_source` directly, or a frame
/// whose manifest `source` names a registered source.
pub struct App {
    manifest: Dataset,
    /// Named raw sources a `transform` references by name.
    sources: HashMap<String, Arc<dyn Source>>,
    /// frame name -> (dependency source names, transform closure).
    transforms: HashMap<String, (Vec<String>, TransformFn)>,
    /// frame name -> a directly-supplied source (no transform).
    direct: HashMap<String, Arc<dyn Source>>,
}

impl App {
    /// Start from an in-memory [`Dataset`] manifest.
    pub fn new(manifest: Dataset) -> Self {
        App {
            manifest,
            sources: HashMap::new(),
            transforms: HashMap::new(),
            direct: HashMap::new(),
        }
    }

    /// Start from a `manifest.json` on disk (frames/views/axes/metrics). Frames
    /// receive their Rust transforms via [`App::transform`] by name; the manifest
    /// must be **declarative** for those frames (no serialized `transform` plan
    /// path — that is the Python/CLI path).
    pub fn from_manifest(path: impl AsRef<Path>) -> Result<Self> {
        let p = path.as_ref();
        let raw = std::fs::read_to_string(p)
            .map_err(|e| Error::Engine(format!("read manifest {p:?}: {e}")))?;
        let manifest: Dataset = serde_json::from_str(&raw)
            .map_err(|e| Error::Schema(format!("manifest {p:?} is not a valid dataset: {e}")))?;
        Ok(App::new(manifest))
    }

    /// Register a named raw source (`transform` dependencies reference these).
    pub fn source(mut self, name: impl Into<String>, src: impl Source + 'static) -> Self {
        self.sources.insert(name.into(), Arc::new(src));
        self
    }

    /// Register a named raw source already behind an `Arc`.
    pub fn source_arc(mut self, name: impl Into<String>, src: Arc<dyn Source>) -> Self {
        self.sources.insert(name.into(), src);
        self
    }

    /// Attach a Rust transform to frame `frame`: the closure receives a
    /// `{dep -> LazyFrame}` map for the named `deps` (each registered via
    /// [`App::source`]) and returns the frame's `LazyFrame`.
    pub fn transform<F, I, S>(mut self, frame: impl Into<String>, deps: I, f: F) -> Self
    where
        I: IntoIterator<Item = S>,
        S: Into<String>,
        F: Fn(&HashMap<String, LazyFrame>) -> LazyFrame + Send + Sync + 'static,
    {
        let deps: Vec<String> = deps.into_iter().map(Into::into).collect();
        self.transforms.insert(frame.into(), (deps, Arc::new(f)));
        self
    }

    /// Back a frame directly with a [`Source`] (no transform).
    pub fn frame_source(mut self, frame: impl Into<String>, src: impl Source + 'static) -> Self {
        self.direct.insert(frame.into(), Arc::new(src));
        self
    }

    /// Build the prebuilt `frame -> Source` map the engine loader consumes.
    fn build_frame_sources(&self) -> Result<HashMap<String, Arc<dyn Source>>> {
        let needed = taxa_core::loader::referenced_frames(&self.manifest)?;
        let mut out: HashMap<String, Arc<dyn Source>> = HashMap::new();
        for fname in &needed {
            if out.contains_key(fname) {
                continue;
            }
            let frame = self
                .manifest
                .frame(fname)
                .ok_or_else(|| Error::Schema(format!("view frame {fname:?} not in `frames`")))?;

            // Declarative-only rule: a serialized transform plan-path is the
            // Python/CLI path, never the Rust facade — reject it explicitly so a
            // reused Python-generated manifest can't silently conflict with a
            // Rust closure attached to the same frame.
            if frame.transform.is_some() {
                return Err(Error::Schema(format!(
                    "frame {fname:?} carries a serialized `transform` plan-path, which the \
                     Rust facade does not run. Remove it and attach the transform in code \
                     via .transform({fname:?}, …), or serve the manifest with the `taxa` CLI."
                )));
            }

            let src: Arc<dyn Source> = if let Some((deps, f)) = self.transforms.get(fname) {
                // A Rust transform: build a ClosureSource over its named deps.
                let mut dep_map: HashMap<String, Arc<dyn Source>> = HashMap::new();
                for d in deps {
                    let s = self.sources.get(d).cloned().ok_or_else(|| {
                        Error::Schema(format!(
                            "transform for frame {fname:?} needs source {d:?}, which was not \
                             registered via .source()"
                        ))
                    })?;
                    dep_map.insert(d.clone(), s);
                }
                let user_f = f.clone();
                Arc::new(ClosureSource::new(dep_map, move |inputs| {
                    Ok(user_f(inputs))
                }))
            } else if let Some(direct) = self.direct.get(fname) {
                direct.clone()
            } else if let Some(name) = frame.source.as_deref() {
                // The frame's `source` names a registered raw source.
                self.sources.get(name).cloned().ok_or_else(|| {
                    Error::Schema(format!(
                        "frame {fname:?} source {name:?} was not registered — add \
                         .source({name:?}, …), .frame_source({fname:?}, …), or \
                         .transform({fname:?}, …)"
                    ))
                })?
            } else {
                return Err(Error::Schema(format!(
                    "frame {fname:?} has no source: attach .transform({fname:?}, …), \
                     .frame_source({fname:?}, …), or give it a `source` naming a registered source"
                )));
            };
            out.insert(fname.clone(), src);
        }
        Ok(out)
    }

    /// Build all sources and wire the engine **without serving** — for tests and
    /// embedding. Sources are built eagerly here (before any runtime), matching
    /// the polars "build before the tokio runtime" rule.
    pub fn build(&self) -> Result<Loaded> {
        let frame_sources = self.build_frame_sources()?;
        taxa_core::loader::load(&self.manifest, &frame_sources)
    }

    /// Build all sources (eagerly, before the runtime) and serve the GUI.
    pub fn serve(self, host: &str, port: u16) -> Result<()> {
        // Sources are built BEFORE the tokio runtime (see `build`) — polars' async
        // parquet reader spins its own runtime and would panic if driven from a
        // tokio worker.
        let loaded = self.build()?;
        let rt = tokio::runtime::Builder::new_multi_thread()
            .enable_all()
            .build()
            .map_err(|e| Error::Engine(format!("tokio runtime: {e}")))?;
        rt.block_on(taxa_server::serve_ext(
            loaded.main_ds,
            loaded.main_backend,
            loaded.series,
            loaded.series_follows_treemap,
            host,
            port,
        ))
        .map_err(|e| Error::Engine(format!("server error: {e}")))
    }
}