nornir 0.4.28

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
//! Server-monitored self-sync (server-user `nornir`): the poll loop and the
//! per-workspace fetch step.
//!
//! The loop runs inside `nornir-server`: every tick it fetches each *monitored*
//! workspace's git members (pure-Rust HTTPS via [`crate::gitio`]), records sync
//! state in the [`crate::registry`], and — when any member's HEAD moved —
//! invokes the republish hook. [`fetch_workspace`] is shared with the CLI's
//! `nornir workspace fetch`.

use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, bail, Context, Result};

use crate::registry::{Mode, Registry};

/// Outcome of fetching one workspace's members.
#[derive(Debug, Default)]
pub struct FetchReport {
    pub workspace: String,
    /// Members with a remote that were clone/fetched.
    pub fetched: usize,
    /// Members whose HEAD moved since last sync.
    pub changed: Vec<String>,
    /// `(member, error)` for members that failed.
    pub errors: Vec<(String, String)>,
}

impl FetchReport {
    pub fn changed_any(&self) -> bool {
        !self.changed.is_empty()
    }
}

/// Clone/fetch every git member of `name` into `<root>/<name>/git/<member>`,
/// write back each member's sync state, and return what changed. Shared by the
/// CLI verb and the poll loop. HTTPS only (SSH refused; see [`crate::gitio`]).
pub fn fetch_workspace(reg: &Registry, root: &Path, name: &str) -> Result<FetchReport> {
    let mut ws = reg
        .get(name)?
        .ok_or_else(|| anyhow!("no workspace `{name}` in the registry"))?;
    let ws_root = root.join(&ws.name);
    let git_dir = ws_root.join("git");
    std::fs::create_dir_all(&git_dir)?;
    std::fs::create_dir_all(ws_root.join("builds"))?;

    let now = chrono::Utc::now().to_rfc3339();
    let mut report = FetchReport {
        workspace: ws.name.clone(),
        ..Default::default()
    };
    for m in &mut ws.members {
        if m.remote.is_empty() {
            continue; // local/external member — source lives outside
        }
        report.fetched += 1;
        let dest = git_dir.join(&m.name);
        // SSH remotes authenticate with the server's deploy key (resolved inside
        // clone_or_fetch via NORNIR_SSH_DIR / the `nornir` user's home).
        match crate::gitio::clone_or_fetch(&m.remote, &dest, None) {
            Ok(sha) => {
                if sha != m.last_seen_sha {
                    report.changed.push(m.name.clone());
                }
                m.last_seen_sha = sha;
                m.last_synced = now.clone();
                m.sync_state = "ok".into();
            }
            Err(e) => {
                let msg = format!("{e:#}");
                m.sync_state = format!("error: {msg}");
                report.errors.push((m.name.clone(), msg));
            }
        }
    }
    ws.updated_at = now;
    reg.upsert(&ws)?;
    Ok(report)
}

/// One sweep: fetch every monitored workspace; republish those that changed.
/// Per-member errors are isolated (logged, not fatal) so one bad remote never
/// stalls the others or aborts the sweep. Takes the server's shared registry
/// handle (redb is single-writer — the poll loop and the Workspaces RPC must
/// share one `Registry`, never each open their own).
pub fn sync_once(reg: &Registry, root: &Path) -> Result<()> {
    for ws in reg.list()? {
        if ws.mode != Mode::Monitored {
            continue;
        }
        match fetch_workspace(&reg, root, &ws.name) {
            Ok(rep) => {
                for (m, e) in &rep.errors {
                    eprintln!("nornir-monitor: {}::{m} fetch error: {e}", rep.workspace);
                }
                // Republish on a git change OR when never built (empty
                // `current_snapshot` — e.g. the warehouse was wiped) so a clean
                // warehouse rebuilds from git instead of staying empty.
                let needs_build = reg
                    .get(&ws.name)
                    .ok()
                    .flatten()
                    .map(|r| r.current_snapshot.trim().is_empty())
                    .unwrap_or(false);
                if rep.changed_any() || needs_build {
                    let why = if rep.changed_any() {
                        format!("changed [{}]", rep.changed.join(", "))
                    } else {
                        "empty warehouse".to_string()
                    };
                    eprintln!("nornir-monitor: {} {why} → republish", rep.workspace);
                    // Incremental scoping (F1): pass the changed members so the
                    // rebuild covers only `changed ∪ dependents`. On the
                    // "empty warehouse" path there is no change-set → `None`
                    // forces a full rebuild (first-publish fallback).
                    let changed: Option<&[String]> =
                        if rep.changed_any() { Some(&rep.changed) } else { None };
                    match republish(reg, root, &ws.name, changed) {
                        Ok(snap) => eprintln!(
                            "nornir-monitor: {} republished → snapshot {snap}",
                            ws.name
                        ),
                        Err(e) => {
                            eprintln!("nornir-monitor: {} republish failed: {e:#}", ws.name)
                        }
                    }
                }
            }
            Err(e) => eprintln!("nornir-monitor: {} fetch failed: {e:#}", ws.name),
        }
    }
    Ok(())
}

/// Rebuild + republish a workspace's warehouse from its freshly-fetched
/// sources: knowledge scan + member-scoped code index + a per-member snapshot,
/// all under `<root>/<ws>/builds/` (the workspace's own Iceberg warehouse).
/// Returns the latest snapshot id; records it on the registry row.
///
/// Operates on the materialized clones under `<root>/<ws>/git/<member>/` — only
/// members with a real checkout (HTTPS today; SSH pack-fetch pending).
///
/// **Incremental (F1):** `changed` carries the members whose HEAD moved this
/// sweep. When present, the index rebuild is scoped to the *affected set* —
/// `changed ∪ transitive dependents` from the cross-repo dep graph — so an
/// unaffected member is not re-walked. `changed = None` (first publish / empty
/// warehouse) falls back to a full rebuild over all members, as does a missing
/// dep graph.
///
/// `pub` so the eager `nornir workspace add` (fat mode) can build inline right
/// after fetch — the same build the poll loop runs on a change. Server and fat
/// thus do the *same* thing; the poll loop is just the automated re-run.
pub fn republish(
    reg: &Registry,
    root: &Path,
    ws_name: &str,
    changed: Option<&[String]>,
) -> Result<String> {
    use crate::warehouse::iceberg::IcebergWarehouse;

    let ws_root = root.join(ws_name);
    let git_dir = ws_root.join("git");
    let builds = ws_root.join("builds");
    std::fs::create_dir_all(&builds)?;

    let mut record = reg
        .get(ws_name)?
        .ok_or_else(|| anyhow!("workspace `{ws_name}` vanished from the registry"))?;
    let members: Vec<String> = record.members.iter().map(|m| m.name.clone()).collect();

    let index_dir = builds.join("cache").join("index");
    let wh = IcebergWarehouse::open(&builds)
        .with_context(|| format!("open warehouse {}", builds.display()))?;
    let idx = crate::index::Index::open_at(&git_dir, &index_dir)?
        .with_repo_scope(members.clone());

    // Deep-scan is opted into per-workspace in the descriptor ([workspace]
    // deep_scan = true). Best-effort read — a missing/unparsable descriptor
    // simply means no deep-scan.
    let deep_scan = crate::workspace::WorkspaceDescriptor::load(Path::new(&record.descriptor))
        .map(|d| d.workspace.deep_scan)
        .unwrap_or(false);

    let snap = republish_with(
        &wh, &idx, &git_dir, &index_dir, &members, ws_name, deep_scan, changed,
    )?;

    record.current_snapshot = snap.clone();
    record.updated_at = chrono::Utc::now().to_rfc3339();
    reg.upsert(&record)?;
    Ok(snap)
}

/// Republish a monitored workspace **through caller-owned handles** — the warehouse
/// + Tantivy index the `nornir-server` already holds open (redb is single-writer,
/// so the server must be the sole opener; the poll loop writes through these same
/// handles rather than opening its own). Runs the producer pipeline over the
/// materialized member checkouts under `git_dir` and returns the latest snapshot
/// id. Members without a `.git` checkout (e.g. SSH-only, not yet clonable) are
/// skipped. Errors when none are present.
///
/// **Incremental (F1):** when `changed` names the members that moved, the dep
/// graph is built *up front* and the index rebuild is scoped to
/// [`affected_set`] (`changed ∪ transitive dependents`); members outside that
/// set are not re-walked. `changed = None` or an unbuildable graph fall back to
/// a full rebuild over every present member.
pub fn republish_with(
    wh: &crate::warehouse::iceberg::IcebergWarehouse,
    idx: &crate::index::Index,
    git_dir: &Path,
    index_dir: &Path,
    members: &[String],
    ws_name: &str,
    deep_scan: bool,
    changed: Option<&[String]>,
) -> Result<String> {
    let present: Vec<&String> = members
        .iter()
        .filter(|name| git_dir.join(name).join(".git").exists())
        .collect();
    if present.is_empty() {
        bail!(
            "no materialized member checkouts under {} (SSH-only members can't be \
             cloned yet)",
            git_dir.display()
        );
    }

    // Cross-repo dep graph (cheap, offline `cargo metadata --no-deps`). Built up
    // front so the affected-set scoping below can consult it; reused for the
    // step-4 persist. Best-effort — a graph hiccup just means a full rebuild.
    let graph = match build_member_graph(ws_name, git_dir, &present) {
        Ok(g) => Some(g),
        Err(e) => {
            eprintln!("nornir-monitor: {ws_name} dep-graph build skipped: {e:#}");
            None
        }
    };

    // Affected set = changed ∪ transitive dependents. `None` (first publish /
    // empty warehouse) or a missing graph → empty scope = full rebuild.
    let scope: Vec<String> = match (changed, graph.as_ref()) {
        (Some(ch), Some(g)) if !ch.is_empty() => affected_set(g, ch, &present),
        _ => Vec::new(),
    };
    if scope.is_empty() {
        eprintln!("nornir-monitor: {ws_name} → full rebuild over all members");
    } else {
        eprintln!(
            "nornir-monitor: {ws_name} → incremental rebuild, affected [{}]",
            scope.join(", ")
        );
    }

    // 1. Knowledge scan (symbols + git-heat) per member → persist.
    //    **Incremental by git-SHA idempotency** (the "#4 rebuild only what moved"
    //    refinement, done safely without affected_by_change graph-scoping): if a
    //    member's HEAD SHA was already scanned, skip its (expensive) knowledge
    //    scan — unchanged members are a no-op even when the workspace republishes
    //    because another member moved. Also pre-warms the security SBOM cache.
    for name in &present {
        let repo_root = git_dir.join(name);
        let (sha, _branch) = crate::gitio::head_sha_and_branch(&repo_root)
            .unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
        if wh.knowledge_scan_exists(name, &sha).unwrap_or(false) {
            eprintln!("nornir-monitor: {ws_name}/{name} @ {} — knowledge unchanged, skip scan", &sha[..sha.len().min(8)]);
            continue;
        }
        let res = crate::knowledge::scan_all(&repo_root, name)
            .with_context(|| format!("knowledge scan {name}"))?;
        wh.append_symbol_scan(&res.symbols)?;
        wh.append_git_heat_scan(&res.git)?;
        // record the scan (idempotency ledger) + pre-warm security; both best-effort.
        if let Err(e) = wh.record_knowledge_scan(name, &sha, res.symbols.snapshot_id) {
            eprintln!("nornir-monitor: {ws_name}/{name} knowledge-scan ledger skipped: {e:#}");
        }
        if let Err(e) = crate::security::warm(wh, &repo_root, Some(git_dir)) {
            eprintln!("nornir-monitor: {ws_name}/{name} security warm skipped: {e:#}");
        }
    }

    // 2. (Re)build the workspace code index. Incrementally scoped to the
    //    affected members when `scope` is non-empty; full-workspace otherwise.
    idx.build_scoped(&scope).context("build workspace index")?;

    // 3. Snapshot the index into Iceberg, keyed to each member's HEAD.
    let mut last_snapshot = String::new();
    for name in &present {
        let repo_root = git_dir.join(name);
        let (sha, branch) = crate::gitio::head_sha_and_branch(&repo_root)
            .unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
        let snap = crate::index::snapshot::snapshot_to_iceberg(
            wh, ws_name, name, &sha, &branch, index_dir,
        )
        .with_context(|| format!("snapshot {name}"))?;
        last_snapshot = snap.snapshot_id.to_string();
    }

    // 4. Persist the cross-repo dependency graph built up front (step 0) so the
    //    viz Dep Graph tab reflects the monitored sources. Note: this graph is
    //    *cross-repo*, so a single-member workspace yields no edges; it lights up
    //    for multi-repo monitored workspaces.
    if let Some(graph) = graph.as_ref() {
        if let Err(e) = wh.record_dep_graph(ws_name, graph) {
            eprintln!("nornir-monitor: {ws_name} dep-graph persist skipped: {e:#}");
        }
    }

    // 5. Opt-in deep-scan: knowledge-scan the members' entire transitive dep
    //    closure (gatling-fanned) into this same warehouse under repo `deps`.
    //    Best-effort: needs `cargo` on PATH + network for the first fetch.
    if deep_scan {
        let dirs: Vec<std::path::PathBuf> =
            present.iter().map(|n| git_dir.join(n.as_str())).collect();
        match crate::deepscan::deep_scan(&dirs, wh) {
            Ok(rep) => {
                eprintln!(
                    "nornir-monitor: {ws_name} deep-scan: {} dep crate(s) scanned, {} skipped, {} error(s)",
                    rep.crates,
                    rep.skipped,
                    rep.errors.len()
                );
                for e in rep.errors.iter().take(5) {
                    eprintln!("nornir-monitor:   deep-scan error: {e}");
                }
            }
            Err(e) => eprintln!("nornir-monitor: {ws_name} deep-scan skipped: {e:#}"),
        }
    }

    Ok(last_snapshot)
}

/// The **affected set** for an incremental republish: the members whose HEAD
/// moved (`changed`) ∪ their transitive dependents, per the cross-repo dep
/// [`graph`](crate::warehouse::dep_graph::WorkspaceGraph). The result is
/// intersected with `present` (the materialized member checkouts) — a changed
/// member that has no checkout, or a graph node that isn't a workspace member,
/// is dropped so the index scope only ever names real member directories.
///
/// Returns an empty vec only when nothing in `changed` resolves to a present
/// member; callers treat an empty scope as "full rebuild", so this stays a
/// safe over-approximation rather than silently indexing nothing.
fn affected_set(
    graph: &crate::warehouse::dep_graph::WorkspaceGraph,
    changed: &[String],
    present: &[&String],
) -> Vec<String> {
    let present_set: std::collections::BTreeSet<&str> =
        present.iter().map(|s| s.as_str()).collect();
    graph
        .affected_by_change(changed)
        .into_iter()
        .filter(|m| present_set.contains(m.as_str()))
        .collect()
}

/// Build a cross-repo [`WorkspaceGraph`] over the materialized member checkouts
/// by synthesizing a path-based descriptor (so it resolves to `git_dir/<member>`,
/// not the git cache). `cargo metadata --no-deps` → no network.
fn build_member_graph(
    ws_name: &str,
    git_dir: &Path,
    members: &[&String],
) -> Result<crate::warehouse::dep_graph::WorkspaceGraph> {
    use crate::workspace::descriptor::{RepoSpec, WorkspaceDescriptor, WorkspaceMeta};
    let mut repos = std::collections::BTreeMap::new();
    for name in members {
        repos.insert(
            (*name).clone(),
            RepoSpec {
                path: Some(git_dir.join(name).to_string_lossy().into_owned()),
                git: None,
                branch: None,
            },
        );
    }
    let desc = WorkspaceDescriptor {
        workspace: WorkspaceMeta { name: ws_name.to_string(), deep_scan: false },
        repos,
        descriptor_dir: git_dir.to_path_buf(),
    };
    crate::warehouse::dep_graph::WorkspaceGraph::build(&desc)
}

/// Parse `"60s"`, `"5m"`, `"2h"`, or a bare seconds count; fall back to `default`.
pub fn parse_interval(s: &str, default: Duration) -> Duration {
    let s = s.trim();
    if s.is_empty() {
        return default;
    }
    let (num, mult) = match s.chars().last() {
        Some('s') => (&s[..s.len() - 1], 1),
        Some('m') => (&s[..s.len() - 1], 60),
        Some('h') => (&s[..s.len() - 1], 3600),
        _ => (s, 1),
    };
    num.trim()
        .parse::<u64>()
        .ok()
        .map(|n| Duration::from_secs(n.saturating_mul(mult)))
        .unwrap_or(default)
}

/// Spawn the background poll loop over the server's shared registry. Fetches run
/// on a blocking thread so gix I/O never stalls the async runtime. Returns the
/// join handle immediately.
pub fn spawn_poll_loop(
    reg: Arc<Registry>,
    root: PathBuf,
    tick: Duration,
) -> tokio::task::JoinHandle<()> {
    tokio::spawn(async move {
        let mut interval = tokio::time::interval(tick);
        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
        loop {
            interval.tick().await;
            let reg = reg.clone();
            let root = root.clone();
            match tokio::task::spawn_blocking(move || sync_once(&reg, &root)).await {
                Ok(Ok(())) => {}
                Ok(Err(e)) => eprintln!("nornir-monitor: sweep error: {e:#}"),
                Err(e) => eprintln!("nornir-monitor: sweep task panicked: {e}"),
            }
        }
    })
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::warehouse::dep_graph::{CrossRepoEdge, RepoFacts, WorkspaceGraph};
    use std::collections::{BTreeMap, BTreeSet};

    fn facts(name: &str, produces: &[&str], consumes: &[&str]) -> RepoFacts {
        RepoFacts {
            name: name.to_string(),
            root: std::path::PathBuf::from("/dev/null"),
            produces: produces.iter().map(|s| s.to_string()).collect(),
            consumes: consumes.iter().map(|s| s.to_string()).collect(),
        }
    }
    fn edge(from: &str, to: &str, via: &[&str]) -> CrossRepoEdge {
        CrossRepoEdge {
            from: from.to_string(),
            to: to.to_string(),
            via: via.iter().map(|s| s.to_string()).collect(),
        }
    }

    /// Diamond dep graph: app → liba → util, app → libb → util.
    /// Edges point consumer → producer. A change to `util` should blast up to
    /// liba, libb, app; a change to `liba` should reach only app (not libb/util).
    fn diamond() -> WorkspaceGraph {
        let mut fmap = BTreeMap::new();
        for f in [
            facts("app", &["app_c"], &["a_c", "b_c"]),
            facts("liba", &["a_c"], &["util_c"]),
            facts("libb", &["b_c"], &["util_c"]),
            facts("util", &["util_c"], &[]),
        ] {
            fmap.insert(f.name.clone(), f);
        }
        WorkspaceGraph::from_query_parts(
            fmap,
            vec![
                edge("app", "liba", &["a_c"]),
                edge("app", "libb", &["b_c"]),
                edge("liba", "util", &["util_c"]),
                edge("libb", "util", &["util_c"]),
            ],
        )
    }

    /// F1 inject-assert: feed a fake change-set + a real dep graph, assert the
    /// affected set is exactly `changed ∪ transitive dependents`, and that an
    /// unaffected member (one that nothing changed depends on) is *skipped*.
    #[test]
    fn affected_set_is_changed_union_dependents_and_skips_unaffected() {
        let g = diamond();
        // All four members have a materialized checkout.
        let all = ["app".to_string(), "liba".to_string(), "libb".to_string(), "util".to_string()];
        let present: Vec<&String> = all.iter().collect();

        // A change to `liba` only. Affected = liba ∪ its dependents = {liba, app}.
        let affected = affected_set(&g, &["liba".to_string()], &present);
        let got: BTreeSet<&str> = affected.iter().map(|s| s.as_str()).collect();
        assert_eq!(
            got,
            ["liba", "app"].into_iter().collect::<BTreeSet<_>>(),
            "affected set must be changed ∪ transitive dependents"
        );
        // `libb` and `util` are unaffected by a change to `liba` → skipped.
        assert!(!got.contains("libb"), "sibling libb must be skipped — it does not depend on liba");
        assert!(!got.contains("util"), "dependency util must be skipped — liba depends on it, not vice-versa");
        // Build order: a dependency must precede its dependent (liba before app).
        let pos = |n: &str| affected.iter().position(|x| x == n).unwrap();
        assert!(pos("liba") < pos("app"), "affected set must come back in build order");

        // A change to the root `util` blasts up the whole diamond.
        let blast = affected_set(&g, &["util".to_string()], &present);
        let blast_set: BTreeSet<&str> = blast.iter().map(|s| s.as_str()).collect();
        assert_eq!(
            blast_set,
            ["app", "liba", "libb", "util"].into_iter().collect::<BTreeSet<_>>(),
            "a change to the shared leaf must rebuild every member"
        );

        // A changed member with no materialized checkout is dropped from the scope
        // (intersection with `present`), so the index never names a missing dir.
        let only_app = vec![&all[0]]; // present = {app}
        let scoped = affected_set(&g, &["liba".to_string()], &only_app);
        assert_eq!(
            scoped,
            vec!["app".to_string()],
            "affected set is intersected with present checkouts — absent `liba` is dropped"
        );
    }

    #[test]
    fn interval_parsing() {
        let d = Duration::from_secs(60);
        assert_eq!(parse_interval("", d), Duration::from_secs(60));
        assert_eq!(parse_interval("30s", d), Duration::from_secs(30));
        assert_eq!(parse_interval("5m", d), Duration::from_secs(300));
        assert_eq!(parse_interval("2h", d), Duration::from_secs(7200));
        assert_eq!(parse_interval("90", d), Duration::from_secs(90));
        assert_eq!(parse_interval("garbage", d), Duration::from_secs(60));
    }
}