nornir 0.4.13

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
//! Iceberg-backed funnel store.
//!
//! Replaces the original ndjson append-only file with an Iceberg
//! `funnel_events` table under `<workspace>/.nornir/warehouse`. The
//! materialised in-memory [`Funnel`] is built by scanning every row
//! and folding via [`Funnel::apply`] — same model as before, durable
//! storage is just typed Parquet now with snapshot-per-write.
//!
//! Validation still lives in `apply`: bad events are rejected
//! before they ever touch the Iceberg writer.

use std::path::{Path, PathBuf};
use std::sync::Arc;

use anyhow::{Context, Result};

use super::event::Event;
use super::state::Funnel;
use crate::warehouse::funnel::{append_event, load_all_events};
use crate::warehouse::iceberg::IcebergWarehouse;

pub struct Store {
    wh: Arc<IcebergWarehouse>,
    root: PathBuf,
    /// True when opened via [`open_read_only`](Self::open_read_only) and the
    /// catalog was a copied-aside snapshot — writes would land in a throwaway
    /// temp dir, so [`record`](Self::record) refuses them instead of losing them.
    read_only: bool,
    pub funnel: Funnel,
}

impl Store {
    /// Default location: `<workspace_root>/.nornir/warehouse` (shared
    /// with bench_runs / dep_graph / release_lineage tables).
    pub fn default_root(workspace_root: &Path) -> PathBuf {
        workspace_root.join(".nornir").join("warehouse")
    }

    /// **Single source of truth** for *where the funnel lives*, shared by the
    /// CLI (`nornir funnel …`), the gRPC server (`nornir-server`), and the MCP
    /// (`nornir-mcp`) so a funnel written by one is read by the others — the
    /// whole point of plan item E4 (a CLI-created funnel must show up in the
    /// viz Funnel tab, which reads through the server).
    ///
    /// **Precedence (highest first):**
    ///
    /// 1. `explicit` — an explicit `--funnel-root <dir>` flag (None when unset);
    /// 2. `NORNIR_FUNNEL_ROOT` — the **global override** the server/MCP honor
    ///    (set in the generated `nornir.env` by `nornir serve install`). When
    ///    present, *every* surface lands on this exact dir;
    /// 3. `[storage].local_path` (from `nornir.toml`) →
    ///    `<workspace_root>/<local_path>/warehouse` — the workspace-local home,
    ///    kept in lock-step with `config::Loaded::warehouse_root`;
    /// 4. repo-local default → `<workspace_root>/.nornir/warehouse`
    ///    (= [`default_root`](Self::default_root)).
    ///
    /// Levels 1–2 are *roots as-given* (no `warehouse/` suffix): they point
    /// straight at the warehouse dir the server opens. Levels 3–4 mirror the
    /// non-funnel warehouse-root rule so a CLI and server collide on the same
    /// file (and the lock-tolerant read open keeps that collision safe).
    pub fn resolve_root(
        workspace_root: &Path,
        local_path: &str,
        explicit: Option<&Path>,
    ) -> PathBuf {
        if let Some(p) = explicit {
            return p.to_path_buf();
        }
        if let Some(r) = std::env::var_os("NORNIR_FUNNEL_ROOT") {
            return PathBuf::from(r);
        }
        if local_path.is_empty() {
            Self::default_root(workspace_root)
        } else {
            workspace_root.join(local_path).join("warehouse")
        }
    }

    /// Open (or create) the warehouse at `root` and replay every
    /// `funnel_events` row into a materialised [`Funnel`].
    pub fn open(root: impl Into<PathBuf>) -> Result<Self> {
        Self::open_inner(root, false)
    }

    /// Lock-tolerant **read-only** open. Behaves like [`open`](Self::open) but,
    /// when the warehouse's `catalog.redb` is already held exclusively by
    /// another process *or another workspace inside the same server* (the funnel
    /// root is global via `NORNIR_FUNNEL_ROOT`, so only one `WorkspaceCtx` can
    /// hold the exclusive lock), it falls back to a copied-aside read-only
    /// snapshot instead of failing. The returned store reflects the funnel as of
    /// now and must not be mutated — perfect for the read-only `Funnel.Show`.
    pub fn open_read_only(root: impl Into<PathBuf>) -> Result<Self> {
        Self::open_inner(root, true)
    }

    fn open_inner(root: impl Into<PathBuf>, read_only: bool) -> Result<Self> {
        let root = root.into();
        let wh = if read_only {
            IcebergWarehouse::open_read_only(&root)
        } else {
            IcebergWarehouse::open(&root)
        };
        let wh = Arc::new(
            wh.with_context(|| format!("open iceberg warehouse at {}", root.display()))?,
        );
        let events = wh
            .block_on(async { load_all_events(&wh).await })
            .with_context(|| format!("load funnel_events from {}", root.display()))?;
        let mut funnel = Funnel::default();
        for (i, ev) in events.iter().enumerate() {
            funnel
                .apply(ev)
                .with_context(|| format!("replay funnel event {} of {}", i + 1, events.len()))?;
        }
        // `read_only` may have been requested *and not contended* (got the real
        // exclusive lock): then it's a normal writable open. Only a copied-aside
        // snapshot is truly read-only.
        let read_only = wh.is_snapshot();
        Ok(Self { wh, root, read_only, funnel })
    }

    /// True when this store is a read-only snapshot (catalog was locked
    /// elsewhere). Writes are refused; reads reflect the funnel as of open time.
    pub fn is_read_only(&self) -> bool {
        self.read_only
    }

    /// Validate, then append the event as a new row (= new Iceberg snapshot).
    pub fn record(&mut self, event: Event) -> Result<()> {
        if self.read_only {
            anyhow::bail!(
                "funnel store at {} is a read-only snapshot (catalog locked by \
                 another process/workspace); cannot record events",
                self.root.display()
            );
        }
        self.funnel
            .apply(&event)
            .context("validate event against current funnel state")?;
        self.wh
            .block_on(async { append_event(&self.wh, &event).await })
            .context("append event to iceberg")?;
        Ok(())
    }

    pub fn root(&self) -> &Path {
        &self.root
    }

    pub fn warehouse(&self) -> &Arc<IcebergWarehouse> {
        &self.wh
    }

    /// Async wrapper for [`Store::open`] safe to call from inside a
    /// tokio runtime. Runs the blocking open on the dedicated
    /// blocking-thread pool so the warehouse's inner runtime can
    /// drive the catalog without a nested-runtime panic.
    pub async fn open_async(root: impl Into<PathBuf>) -> Result<Self> {
        let root = root.into();
        tokio::task::spawn_blocking(move || Self::open(root))
            .await
            .context("join blocking task for Store::open")?
    }

    /// Async wrapper for [`Store::open_read_only`] — the lock-tolerant read
    /// open, safe to call from inside a tokio runtime.
    pub async fn open_read_only_async(root: impl Into<PathBuf>) -> Result<Self> {
        let root = root.into();
        tokio::task::spawn_blocking(move || Self::open_read_only(root))
            .await
            .context("join blocking task for Store::open_read_only")?
    }

    /// Async wrapper for [`Store::record`] safe to call from inside a
    /// tokio runtime. Validation happens inline; the iceberg append
    /// is offloaded to the blocking pool.
    pub async fn record_async(&mut self, event: Event) -> Result<()> {
        if self.read_only {
            anyhow::bail!(
                "funnel store at {} is a read-only snapshot (catalog locked by \
                 another process/workspace); cannot record events",
                self.root.display()
            );
        }
        self.funnel
            .apply(&event)
            .context("validate event against current funnel state")?;
        let wh = Arc::clone(&self.wh);
        let ev = event;
        tokio::task::spawn_blocking(move || {
            wh.block_on(async { append_event(&wh, &ev).await })
        })
        .await
        .context("join blocking task for append_event")??;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::funnel::event::Event;
    use crate::funnel::ids::{IdeaId, NodeId, PlanId};
    use crate::funnel::event::{NodeStatus, PlanStatus};
    use chrono::Utc;
    use std::sync::Mutex;
    use tempfile::tempdir;

    /// `NORNIR_FUNNEL_ROOT` is process-global; serialize the env-mutating tests
    /// so they can't observe each other's value under the parallel test runner.
    static ENV_LOCK: Mutex<()> = Mutex::new(());

    /// E4 root-resolution precedence: explicit override beats the env var beats
    /// `[storage].local_path` beats the repo-local `.nornir/warehouse` default —
    /// and it's identical to what the server/MCP compute, so the CLI writes
    /// where they read.
    #[test]
    fn resolve_root_precedence() {
        let _g = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
        let ws = Path::new("/ws");

        // Save/clear the env so this test is deterministic regardless of host.
        let saved = std::env::var_os("NORNIR_FUNNEL_ROOT");
        std::env::remove_var("NORNIR_FUNNEL_ROOT");

        // 4. repo-local default (no env, empty local_path).
        assert_eq!(
            Store::resolve_root(ws, "", None),
            ws.join(".nornir").join("warehouse"),
        );
        // 3. config local_path (no env).
        assert_eq!(
            Store::resolve_root(ws, "store", None),
            ws.join("store").join("warehouse"),
        );

        // 2. NORNIR_FUNNEL_ROOT overrides config local_path, as-given.
        std::env::set_var("NORNIR_FUNNEL_ROOT", "/global/funnel");
        assert_eq!(
            Store::resolve_root(ws, "store", None),
            PathBuf::from("/global/funnel"),
        );
        // 1. explicit --funnel-root beats even the env override.
        assert_eq!(
            Store::resolve_root(ws, "store", Some(Path::new("/explicit"))),
            PathBuf::from("/explicit"),
        );

        // Restore.
        match saved {
            Some(v) => std::env::set_var("NORNIR_FUNNEL_ROOT", v),
            None => std::env::remove_var("NORNIR_FUNNEL_ROOT"),
        }
    }

    /// **Inject-and-assert (E4):** write a full idea→plan→nodes→edges funnel
    /// through the *exact same `Store::resolve_root` + `Store::record` calls the
    /// CLI's `run_funnel` makes* into a temp `NORNIR_FUNNEL_ROOT`, then reopen
    /// through the *same resolution the server/MCP use* and assert the plan, its
    /// nodes, and its edges match byte-for-byte — the regression guard for "a
    /// CLI-created funnel doesn't show up in the viz Funnel tab".
    #[test]
    fn cli_write_server_read_roundtrip() {
        let _g = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
        let dir = tempdir().unwrap();
        let root = dir.path().join("global-funnel");

        let saved = std::env::var_os("NORNIR_FUNNEL_ROOT");
        std::env::set_var("NORNIR_FUNNEL_ROOT", &root);
        let ws = dir.path().join("workspace"); // NOT where the funnel lands

        // --- CLI WRITE PATH: resolve via the env override (workspace-local
        // `local_path` is deliberately set to prove the env wins), then record
        // the same event sequence `nornir funnel submit/plan/node/link` emits.
        let cli_root = Store::resolve_root(&ws, "store", None);
        assert_eq!(cli_root, root, "env override must win over local_path");

        // Replay orders events by `ts` (Iceberg scan order isn't stable), so —
        // exactly like the real CLI, which calls `Utc::now()` per verb — give
        // each event a strictly increasing timestamp.
        let base = Utc::now();
        let at = |i: i64| base + chrono::Duration::microseconds(i);
        let idea = IdeaId::seq(1);
        let plan = PlanId::seq(1);
        let n1 = NodeId::seq(1);
        let n2 = NodeId::seq(2);
        {
            let mut s = Store::open(&cli_root).unwrap();
            s.record(Event::IdeaSubmitted {
                id: idea.clone(),
                source: "cli".into(),
                text: "ship E4".into(),
                refs: vec![],
                ts: at(0),
            })
            .unwrap();
            s.record(Event::PlanCreated {
                id: plan.clone(),
                idea_id: idea.clone(),
                summary: "funnel store split".into(),
                planner: "cli".into(),
                ts: at(1),
            })
            .unwrap();
            s.record(Event::PlanStatusChanged {
                plan_id: plan.clone(),
                status: PlanStatus::Active,
                why: None,
                ts: at(2),
            })
            .unwrap();
            s.record(Event::NodeAdded {
                plan_id: plan.clone(),
                node_id: n1.clone(),
                kind: "code:write".into(),
                params: serde_json::Map::new(),
                targets: vec!["src/funnel/store.rs".into()],
                prompt_excerpt: None,
                ts: at(3),
            })
            .unwrap();
            s.record(Event::NodeAdded {
                plan_id: plan.clone(),
                node_id: n2.clone(),
                kind: "test:write".into(),
                params: serde_json::Map::new(),
                targets: vec![],
                prompt_excerpt: None,
                ts: at(4),
            })
            .unwrap();
            // n2 depends on n1.
            s.record(Event::EdgeAdded {
                plan_id: plan.clone(),
                from_node: n1.clone(),
                to_node: n2.clone(),
                ts: at(5),
            })
            .unwrap();
        } // writer drops → exclusive catalog lock released

        // --- SERVER/VIZ READ PATH: resolve the SAME way the server does
        // (env override > local_path > default) and replay. Use the
        // lock-tolerant read open, exactly like the server's `Funnel.Show`.
        let server_root = Store::resolve_root(&ws, "store", None);
        assert_eq!(server_root, cli_root, "server must resolve the CLI's root");
        let mut s2 = Store::open_read_only(&server_root).unwrap();

        // The idea is there.
        assert!(s2.funnel.ideas.contains_key(&idea));

        // The plan matches EXACTLY — id, idea linkage, status, summary.
        let p = s2.funnel.plans.get(&plan).expect("plan must round-trip");
        assert_eq!(p.idea_id, idea);
        assert_eq!(p.summary, "funnel store split");
        assert_eq!(p.status, PlanStatus::Active);

        // Both nodes round-trip with the right kind/targets.
        assert_eq!(p.nodes.len(), 2, "both nodes must persist");
        let pn1 = p.nodes.get(&n1).expect("n1");
        assert_eq!(pn1.kind, "code:write");
        assert_eq!(pn1.targets, vec!["src/funnel/store.rs".to_string()]);
        let pn2 = p.nodes.get(&n2).expect("n2");
        assert_eq!(pn2.kind, "test:write");

        // Replay leaves both nodes `Pending` (readiness is a derived, in-memory
        // projection via `promote_ready`, never persisted) — assert that exact
        // persisted shape.
        assert_eq!(pn1.status, NodeStatus::Pending);
        assert_eq!(pn2.status, NodeStatus::Pending);

        // The dependency edge round-trips exactly.
        assert_eq!(p.edges.len(), 1, "exactly one edge");
        assert!(p.edges.contains(&(n1.clone(), n2.clone())), "n1→n2 edge");

        // And the derived readiness the viz computes is correct from the
        // round-tripped graph: n1 (no deps) → Ready, n2 (waits on n1) stays
        // Pending until n1 is Done.
        s2.funnel.promote_ready();
        let p = s2.funnel.plans.get(&plan).unwrap();
        assert_eq!(p.nodes.get(&n1).unwrap().status, NodeStatus::Ready);
        assert_eq!(p.nodes.get(&n2).unwrap().status, NodeStatus::Pending);

        match saved {
            Some(v) => std::env::set_var("NORNIR_FUNNEL_ROOT", v),
            None => std::env::remove_var("NORNIR_FUNNEL_ROOT"),
        }
    }

    #[test]
    fn roundtrip_idea_then_replay() {
        let dir = tempdir().unwrap();
        let root = dir.path().join("wh");

        {
            let mut s = Store::open(&root).unwrap();
            s.record(Event::IdeaSubmitted {
                id: IdeaId::seq(1),
                source: "test".into(),
                text: "first idea".into(),
                refs: vec![],
                ts: Utc::now(),
            })
            .unwrap();
            assert_eq!(s.funnel.ideas.len(), 1);
        }

        let s2 = Store::open(&root).unwrap();
        assert_eq!(s2.funnel.ideas.len(), 1, "ideas should persist across reopen");
        assert!(s2.funnel.ideas.contains_key(&IdeaId::seq(1)));
    }

    /// An uncontended read-only open is a *normal writable* open (it got the
    /// real lock) — only a copied-aside snapshot is flagged read-only. This is
    /// the path `Funnel.Show` takes when no one else holds the funnel root.
    #[test]
    fn read_only_open_uncontended_is_writable() {
        let dir = tempdir().unwrap();
        let root = dir.path().join("wh");
        let s = Store::open_read_only(&root).unwrap();
        assert!(!s.is_read_only(), "uncontended open should not be a snapshot");
    }

    /// While one writer holds the exclusive catalog lock, a second
    /// `open_read_only` must NOT fail (the bug behind the red Funnel tab) — it
    /// falls back to a read snapshot that replays the same events. Writes to the
    /// snapshot are refused rather than silently lost.
    #[test]
    fn read_only_open_while_locked_yields_snapshot() {
        let dir = tempdir().unwrap();
        let root = dir.path().join("wh");

        // Writer holds the exclusive lock and records one idea.
        let mut writer = Store::open(&root).unwrap();
        writer
            .record(Event::IdeaSubmitted {
                id: IdeaId::seq(1),
                source: "test".into(),
                text: "locked idea".into(),
                refs: vec![],
                ts: Utc::now(),
            })
            .unwrap();

        // Second open while the writer still holds the lock: must succeed via
        // the read-only snapshot and see the recorded idea.
        let mut snap = Store::open_read_only(&root).unwrap();
        assert!(snap.is_read_only(), "contended open must be a read-only snapshot");
        assert!(snap.funnel.ideas.contains_key(&IdeaId::seq(1)));

        // Writes to the snapshot are refused, never silently dropped.
        let err = snap
            .record(Event::IdeaSubmitted {
                id: IdeaId::seq(2),
                source: "test".into(),
                text: "should be rejected".into(),
                refs: vec![],
                ts: Utc::now(),
            })
            .unwrap_err();
        assert!(format!("{err:#}").contains("read-only snapshot"));
    }
}