nornir 0.4.6

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
Documentation
//! Iceberg-backed funnel store.
//!
//! Replaces the original ndjson append-only file with an Iceberg
//! `funnel_events` table under `<workspace>/.nornir/warehouse`. The
//! materialised in-memory [`Funnel`] is built by scanning every row
//! and folding via [`Funnel::apply`] — same model as before, durable
//! storage is just typed Parquet now with snapshot-per-write.
//!
//! Validation still lives in `apply`: bad events are rejected
//! before they ever touch the Iceberg writer.

use std::path::{Path, PathBuf};
use std::sync::Arc;

use anyhow::{Context, Result};

use super::event::Event;
use super::state::Funnel;
use crate::warehouse::funnel::{append_event, load_all_events};
use crate::warehouse::iceberg::IcebergWarehouse;

pub struct Store {
    wh: Arc<IcebergWarehouse>,
    root: PathBuf,
    pub funnel: Funnel,
}

impl Store {
    /// Default location: `<workspace_root>/.nornir/warehouse` (shared
    /// with bench_runs / dep_graph / release_lineage tables).
    pub fn default_root(workspace_root: &Path) -> PathBuf {
        workspace_root.join(".nornir").join("warehouse")
    }

    /// Open (or create) the warehouse at `root` and replay every
    /// `funnel_events` row into a materialised [`Funnel`].
    pub fn open(root: impl Into<PathBuf>) -> Result<Self> {
        let root = root.into();
        let wh = Arc::new(
            IcebergWarehouse::open(&root)
                .with_context(|| format!("open iceberg warehouse at {}", root.display()))?,
        );
        let events = wh
            .block_on(async { load_all_events(&wh).await })
            .with_context(|| format!("load funnel_events from {}", root.display()))?;
        let mut funnel = Funnel::default();
        for (i, ev) in events.iter().enumerate() {
            funnel
                .apply(ev)
                .with_context(|| format!("replay funnel event {} of {}", i + 1, events.len()))?;
        }
        Ok(Self { wh, root, funnel })
    }

    /// Validate, then append the event as a new row (= new Iceberg snapshot).
    pub fn record(&mut self, event: Event) -> Result<()> {
        self.funnel
            .apply(&event)
            .context("validate event against current funnel state")?;
        self.wh
            .block_on(async { append_event(&self.wh, &event).await })
            .context("append event to iceberg")?;
        Ok(())
    }

    pub fn root(&self) -> &Path {
        &self.root
    }

    pub fn warehouse(&self) -> &Arc<IcebergWarehouse> {
        &self.wh
    }

    /// Async wrapper for [`Store::open`] safe to call from inside a
    /// tokio runtime. Runs the blocking open on the dedicated
    /// blocking-thread pool so the warehouse's inner runtime can
    /// drive the catalog without a nested-runtime panic.
    pub async fn open_async(root: impl Into<PathBuf>) -> Result<Self> {
        let root = root.into();
        tokio::task::spawn_blocking(move || Self::open(root))
            .await
            .context("join blocking task for Store::open")?
    }

    /// Async wrapper for [`Store::record`] safe to call from inside a
    /// tokio runtime. Validation happens inline; the iceberg append
    /// is offloaded to the blocking pool.
    pub async fn record_async(&mut self, event: Event) -> Result<()> {
        self.funnel
            .apply(&event)
            .context("validate event against current funnel state")?;
        let wh = Arc::clone(&self.wh);
        let ev = event;
        tokio::task::spawn_blocking(move || {
            wh.block_on(async { append_event(&wh, &ev).await })
        })
        .await
        .context("join blocking task for append_event")??;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::funnel::event::Event;
    use crate::funnel::ids::IdeaId;
    use chrono::Utc;
    use tempfile::tempdir;

    #[test]
    fn roundtrip_idea_then_replay() {
        let dir = tempdir().unwrap();
        let root = dir.path().join("wh");

        {
            let mut s = Store::open(&root).unwrap();
            s.record(Event::IdeaSubmitted {
                id: IdeaId::seq(1),
                source: "test".into(),
                text: "first idea".into(),
                refs: vec![],
                ts: Utc::now(),
            })
            .unwrap();
            assert_eq!(s.funnel.ideas.len(), 1);
        }

        let s2 = Store::open(&root).unwrap();
        assert_eq!(s2.funnel.ideas.len(), 1, "ideas should persist across reopen");
        assert!(s2.funnel.ideas.contains_key(&IdeaId::seq(1)));
    }
}