taxa-core 0.1.0

taxa engine core: manifest model, formula AST→Polars Expr, bounded query generators over Polars.
//! Source-agnostic frame loader: turn a [`Dataset`] + a prebuilt
//! `frame-name → Source` map into the `(main, series)` `(FrameDataset, Backend)`
//! wiring the server needs.
//!
//! This is the shared spine of every on-ramp. The CALLER owns how a frame's
//! `Source` is built (the CLI resolves SQL specs / file paths / serialized
//! transform plans; the Rust facade supplies `ClosureSource`s). The loader owns
//! the engine-side orchestration that must be identical across on-ramps:
//!
//!   * resolve the views (treemap is the main frame; series is the Line tab),
//!   * synthesize each view's flat [`FrameDataset`] via [`Dataset::frame_dataset`],
//!   * propagate the series frame's metrics/resolutions onto the main ds so the
//!     boot manifest exposes the Line tab,
//!   * wire `dims_from` (wrap the narrow series source in a [`JoinSource`] that
//!     left-joins the snapshot's per-entity axis-level columns), and
//!   * surface the `branch_set: "treemap"` flag.
//!
//! The `frame_sources` map MUST be prebuilt and memoized by the caller (one
//! `Source` per frame, built once): a `SqlSource` re-runs its query on every
//! construction, and a frame's source is read more than once here (a view's
//! frame and a `dims_from` frame may coincide), so a per-call rebuild would
//! re-ingest the store.

use std::collections::HashMap;
use std::sync::Arc;

use crate::backend::{Backend, FrameBackend};
use crate::error::{Error, Result};
use crate::manifest::{Dataset, FrameDataset};
use crate::source::{JoinSource, Source};

/// The wired result the server consumes: the main (treemap/scatter/detail)
/// frame-dataset + backend, the optional series (Line tab) frame-dataset +
/// backend, and whether the series view follows the treemap's branch ranking.
pub struct Loaded {
    pub main_ds: FrameDataset,
    pub main_backend: Arc<dyn Backend>,
    pub series: Option<(FrameDataset, Arc<dyn Backend>)>,
    pub series_follows_treemap: bool,
}

/// Look up the prebuilt source for a frame, or a clear error if the caller
/// didn't build it.
fn frame_src(
    frame_sources: &HashMap<String, Arc<dyn Source>>,
    frame: &str,
) -> Result<Arc<dyn Source>> {
    frame_sources
        .get(frame)
        .cloned()
        .ok_or_else(|| Error::Schema(format!("no source was built for frame {frame:?}")))
}

/// The frames the loader will read, in resolution order: the main (treemap)
/// frame, and — when a series view exists — the series frame and any
/// `dims_from` frame. Callers use this to build exactly the sources needed
/// (each once), nothing more.
pub fn referenced_frames(ds: &Dataset) -> Result<Vec<String>> {
    let views = ds.resolved_views().map_err(Error::Schema)?;
    let main = views
        .get("treemap")
        .or_else(|| views.values().next())
        .ok_or_else(|| Error::Schema("manifest `views` is empty".into()))?;
    let mut out = vec![main.frame.clone()];
    if let Some(sv) = views.get("series") {
        if !out.contains(&sv.frame) {
            out.push(sv.frame.clone());
        }
        if let Some(df) = &sv.dims_from {
            if !out.contains(df) {
                out.push(df.clone());
            }
        }
    }
    // Each `tags` filter draws from a companion frame (the entity↔tag long
    // table) that must be built too.
    for f in &ds.filters {
        if f.r#type == "tags" {
            if let Some(frame) = &f.tags_frame {
                if !out.contains(frame) {
                    out.push(frame.clone());
                }
            }
        }
    }
    Ok(out)
}

/// Wire a [`Dataset`] + a prebuilt `frame → Source` map into the server's
/// `(main, series)` backends. See the module docs for the orchestration this
/// owns; see [`referenced_frames`] for exactly which frames must be in the map.
pub fn load(ds: &Dataset, frame_sources: &HashMap<String, Arc<dyn Source>>) -> Result<Loaded> {
    let views = ds.resolved_views().map_err(Error::Schema)?;

    // Main view: treemap, falling back to whatever view exists. Its frame is the
    // snapshot the treemap/scatter/detail render from.
    let main_view = views
        .get("treemap")
        .or_else(|| views.values().next())
        .ok_or_else(|| Error::Schema("manifest `views` is empty".into()))?;
    let main_frame_name = main_view.frame.clone();
    let main_src = frame_src(frame_sources, &main_frame_name)?;
    let mut main_ds = ds.frame_dataset(&main_frame_name)?;
    let main_backend: Arc<dyn Backend> = Arc::new(FrameBackend::new(main_src));

    // Build the in-memory index for each `tags` filter from its companion frame
    // (the entity↔tag long table). See docs/METRIC_SEMANTICS.md (Stage A).
    for f in &main_ds.filters {
        if f.r#type != "tags" {
            continue;
        }
        let frame = f
            .tags_frame
            .as_deref()
            .ok_or_else(|| Error::Schema(format!("tags filter {:?} needs `tags_frame`", f.id)))?;
        let entity = f.entity_column.as_deref().ok_or_else(|| {
            Error::Schema(format!("tags filter {:?} needs `entity_column`", f.id))
        })?;
        let tag = f
            .tag_column
            .as_deref()
            .ok_or_else(|| Error::Schema(format!("tags filter {:?} needs `tag_column`", f.id)))?;
        let src = frame_src(frame_sources, frame)?;
        let idx = crate::tags::TagIndex::build(&*src, entity, tag)
            .map_err(|e| Error::Schema(format!("tags filter {:?}: {e}", f.id)))?;
        main_ds.tag_indices.insert(f.id.clone(), idx);
    }

    // Series view (Line tab). Optional.
    let mut series: Option<(FrameDataset, Arc<dyn Backend>)> = None;
    let mut series_follows_treemap = false;
    if let Some(series_view) = views.get("series") {
        let sframe_name = &series_view.frame;
        let sframe = ds.frame(sframe_name).ok_or_else(|| {
            Error::Schema(format!("series view frame {sframe_name:?} not in `frames`"))
        })?;
        let series_ds = ds.frame_dataset(sframe_name)?;

        // Surface the series frame's metrics/resolutions onto the main ds so the
        // boot manifest exposes exactly those on the Line tab.
        main_ds.series_source = Some(sframe_name.clone());
        main_ds.series_metrics = Some(sframe.metrics.iter().map(|m| m.id.clone()).collect());
        main_ds.series_resolutions = sframe.resolutions.clone();
        main_ds.series_default_resolution = sframe.default_resolution.clone();

        let raw_src = frame_src(frame_sources, sframe_name)?;

        // `dims_from`: enrich the narrow series frame with the snapshot frame's
        // axis-level (+ filter + row_filter) columns, joined on the series id.
        let series_src: Arc<dyn Source> = if let Some(dims_from) = &series_view.dims_from {
            let dims_src = frame_src(frame_sources, dims_from)?;
            let dim_cols = dims_columns(ds);
            Arc::new(JoinSource::from_snapshot(
                raw_src,
                &*dims_src,
                series_ds.id_column.clone(),
                &dim_cols,
            )?)
        } else {
            raw_src
        };

        let series_backend: Arc<dyn Backend> = Arc::new(FrameBackend::new(series_src));
        series_follows_treemap = series_view.branch_set.as_deref() == Some("treemap");
        series = Some((series_ds, series_backend));
    }

    Ok(Loaded {
        main_ds,
        main_backend,
        series,
        series_follows_treemap,
    })
}

/// The dimension columns a `dims_from` join must carry from the snapshot so the
/// series engine sees everything it reads after the join — deduped, authored
/// order:
///
/// 1. axis LEVEL columns (a fixed axis's authored levels; a path axis's single
///    path column — its derived `__lvl*` columns don't exist on the raw snapshot),
/// 2. all FILTER facet columns (the shared `filters` apply to the series frame),
/// 3. every column referenced by an axis `row_filter` formula AST.
///
/// (`JoinSource::from_snapshot` further drops the join key, columns the snapshot
/// lacks, and columns already on the series frame.)
pub fn dims_columns(ds: &Dataset) -> Vec<String> {
    let mut cols: Vec<String> = Vec::new();
    let push = |c: &str, cols: &mut Vec<String>| {
        if !cols.iter().any(|x| x == c) {
            cols.push(c.to_string());
        }
    };
    for a in &ds.axes {
        if let Some(p) = &a.path {
            push(&p.column, &mut cols);
        } else {
            for l in &a.levels {
                push(l, &mut cols);
            }
        }
        if let Some(rf) = &a.row_filter {
            let mut refs: Vec<String> =
                crate::formula::referenced_columns(rf).into_iter().collect();
            refs.sort();
            for c in refs {
                push(&c, &mut cols);
            }
        }
    }
    for f in &ds.filters {
        push(&f.column, &mut cols);
    }
    cols
}