nornir 0.4.41

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
//! Server-monitored self-sync (server-user `nornir`): the poll loop and the
//! per-workspace fetch step.
//!
//! The loop runs inside `nornir-server`: every tick it fetches each *monitored*
//! workspace's git members (pure-Rust HTTPS via [`crate::gitio`]), records sync
//! state in the [`crate::registry`], and — when any member's HEAD moved —
//! invokes the republish hook. [`fetch_workspace`] is shared with the CLI's
//! `nornir workspace fetch`.

use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, bail, Context, Result};

use crate::registry::{Mode, Registry};

/// One member's clone/fetch outcome — the durable, remotely-readable twin of the
/// per-member `eprintln!("…fetch error…")` that used to be the only signal. Every
/// fetched member produces exactly one of these (ok OR error), so a thin viz/CLI
/// client can see *why* a member didn't populate (auth, vanished repo, network),
/// not just that its data is missing. Persisted to the `clone_events` warehouse
/// table via [`crate::warehouse::clone_events::record_fetch_report`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MemberOutcome {
    /// Workspace member name.
    pub member: String,
    /// The git remote that was cloned/fetched (empty for local members — but those
    /// are skipped, so an outcome always carries a real remote).
    pub remote: String,
    /// `clone` | `fetch` (today both go through `clone_or_fetch`; recorded as
    /// `clone-fetch` so the op string matches the error chain users already saw).
    pub op: String,
    /// `ok` | `error`.
    pub status: String,
    /// The error chain (`{e:#}`) on failure; the resolved SHA on success.
    pub detail: String,
    /// Wall-clock duration of this member's clone/fetch, milliseconds.
    pub elapsed_ms: i64,
}

/// Outcome of fetching one workspace's members.
#[derive(Debug, Default)]
pub struct FetchReport {
    pub workspace: String,
    /// Members with a remote that were clone/fetched.
    pub fetched: usize,
    /// Members whose HEAD moved since last sync.
    pub changed: Vec<String>,
    /// `(member, error)` for members that failed.
    pub errors: Vec<(String, String)>,
    /// One outcome per fetched member (ok or error) — the readable populate-status
    /// stream. Same set as `fetched`, but with op/status/detail/elapsed so the
    /// failure (or success) is first-class and persistable, not log-only.
    pub outcomes: Vec<MemberOutcome>,
}

impl FetchReport {
    pub fn changed_any(&self) -> bool {
        !self.changed.is_empty()
    }
}

/// Clone/fetch every git member of `name` into `<root>/<name>/git/<member>`,
/// write back each member's sync state, and return what changed. Shared by the
/// CLI verb and the poll loop. HTTPS only (SSH refused; see [`crate::gitio`]).
pub fn fetch_workspace(reg: &Registry, root: &Path, name: &str) -> Result<FetchReport> {
    let mut ws = reg
        .get(name)?
        .ok_or_else(|| anyhow!("no workspace `{name}` in the registry"))?;
    let ws_root = root.join(&ws.name);
    let git_dir = ws_root.join("git");
    std::fs::create_dir_all(&git_dir)?;
    std::fs::create_dir_all(ws_root.join("builds"))?;

    let now = chrono::Utc::now().to_rfc3339();
    let mut report = FetchReport {
        workspace: ws.name.clone(),
        ..Default::default()
    };
    for m in &mut ws.members {
        if m.remote.is_empty() {
            continue; // local/external member — source lives outside
        }
        report.fetched += 1;
        let dest = git_dir.join(&m.name);
        // SSH remotes authenticate with the server's deploy key (resolved inside
        // clone_or_fetch via NORNIR_SSH_DIR / the `nornir` user's home).
        let t0 = std::time::Instant::now();
        let outcome = crate::gitio::clone_or_fetch(&m.remote, &dest, None);
        let elapsed_ms = t0.elapsed().as_millis() as i64;
        match outcome {
            Ok(sha) => {
                report.outcomes.push(MemberOutcome {
                    member: m.name.clone(),
                    remote: m.remote.clone(),
                    op: "clone-fetch".into(),
                    status: "ok".into(),
                    detail: sha.clone(),
                    elapsed_ms,
                });
                if sha != m.last_seen_sha {
                    report.changed.push(m.name.clone());
                }
                m.last_seen_sha = sha;
                m.last_synced = now.clone();
                m.sync_state = "ok".into();
                // AUT6 freshness: record the clone's working-tree digest so a
                // thin client can detect staleness the SHA can't see. A freshly
                // fetched server clone is normally clean; best-effort (a digest
                // error must not fail the fetch).
                match crate::gitio::worktree_freshness(&dest) {
                    Ok(f) => {
                        m.worktree_digest = f.digest;
                        m.worktree_dirty = f.dirty;
                    }
                    Err(e) => eprintln!(
                        "nornir-monitor: {}::{} worktree freshness skipped: {e:#}",
                        report.workspace, m.name
                    ),
                }
            }
            Err(e) => {
                let msg = format!("{e:#}");
                m.sync_state = format!("error: {msg}");
                report.outcomes.push(MemberOutcome {
                    member: m.name.clone(),
                    remote: m.remote.clone(),
                    op: "clone-fetch".into(),
                    status: "error".into(),
                    detail: msg.clone(),
                    elapsed_ms,
                });
                report.errors.push((m.name.clone(), msg));
            }
        }
    }
    ws.updated_at = now;
    reg.upsert(&ws)?;
    Ok(report)
}

/// One sweep: fetch every monitored workspace; republish those that changed.
/// Per-member errors are isolated (logged, not fatal) so one bad remote never
/// stalls the others or aborts the sweep. Takes the server's shared registry
/// handle (redb is single-writer — the poll loop and the Workspaces RPC must
/// share one `Registry`, never each open their own).
pub fn sync_once(reg: &Registry, root: &Path) -> Result<()> {
    for ws in reg.list()? {
        if ws.mode != Mode::Monitored {
            continue;
        }
        match fetch_workspace(&reg, root, &ws.name) {
            Ok(rep) => {
                for (m, e) in &rep.errors {
                    eprintln!("nornir-monitor: {}::{m} fetch error: {e}", rep.workspace);
                }
                // Republish on a git change OR when never built (empty
                // `current_snapshot` — e.g. the warehouse was wiped) so a clean
                // warehouse rebuilds from git instead of staying empty.
                let needs_build = reg
                    .get(&ws.name)
                    .ok()
                    .flatten()
                    .map(|r| r.current_snapshot.trim().is_empty())
                    .unwrap_or(false);
                if rep.changed_any() || needs_build {
                    let why = if rep.changed_any() {
                        format!("changed [{}]", rep.changed.join(", "))
                    } else {
                        "empty warehouse".to_string()
                    };
                    eprintln!("nornir-monitor: {} {why} → republish", rep.workspace);
                    // Incremental scoping (F1): pass the changed members so the
                    // rebuild covers only `changed ∪ dependents`. On the
                    // "empty warehouse" path there is no change-set → `None`
                    // forces a full rebuild (first-publish fallback).
                    let changed: Option<&[String]> =
                        if rep.changed_any() { Some(&rep.changed) } else { None };
                    match republish(reg, root, &ws.name, changed) {
                        Ok(snap) => eprintln!(
                            "nornir-monitor: {} republished → snapshot {snap}",
                            ws.name
                        ),
                        Err(e) => {
                            eprintln!("nornir-monitor: {} republish failed: {e:#}", ws.name)
                        }
                    }
                }
            }
            Err(e) => eprintln!("nornir-monitor: {} fetch failed: {e:#}", ws.name),
        }
    }
    Ok(())
}

/// Rebuild + republish a workspace's warehouse from its freshly-fetched
/// sources: knowledge scan + member-scoped code index + a per-member snapshot,
/// all under `<root>/<ws>/builds/` (the workspace's own Iceberg warehouse).
/// Returns the latest snapshot id; records it on the registry row.
///
/// Operates on the materialized clones under `<root>/<ws>/git/<member>/` — only
/// members with a real checkout (HTTPS today; SSH pack-fetch pending).
///
/// **Incremental (F1):** `changed` carries the members whose HEAD moved this
/// sweep. When present, the index rebuild is scoped to the *affected set* —
/// `changed ∪ transitive dependents` from the cross-repo dep graph — so an
/// unaffected member is not re-walked. `changed = None` (first publish / empty
/// warehouse) falls back to a full rebuild over all members, as does a missing
/// dep graph.
///
/// `pub` so the eager `nornir workspace add` (fat mode) can build inline right
/// after fetch — the same build the poll loop runs on a change. Server and fat
/// thus do the *same* thing; the poll loop is just the automated re-run.
pub fn republish(
    reg: &Registry,
    root: &Path,
    ws_name: &str,
    changed: Option<&[String]>,
) -> Result<String> {
    use crate::warehouse::iceberg::IcebergWarehouse;

    let ws_root = root.join(ws_name);
    let git_dir = ws_root.join("git");
    let builds = ws_root.join("builds");
    std::fs::create_dir_all(&builds)?;

    let mut record = reg
        .get(ws_name)?
        .ok_or_else(|| anyhow!("workspace `{ws_name}` vanished from the registry"))?;
    let members: Vec<String> = record.members.iter().map(|m| m.name.clone()).collect();

    let index_dir = builds.join("cache").join("index");
    let wh = IcebergWarehouse::open(&builds)
        .with_context(|| format!("open warehouse {}", builds.display()))?;
    let idx = crate::index::Index::open_at(&git_dir, &index_dir)?
        .with_repo_scope(members.clone());

    // Deep-scan is opted into per-workspace in the descriptor ([workspace]
    // deep_scan = true). Best-effort read — a missing/unparsable descriptor
    // simply means no deep-scan.
    let deep_scan = crate::workspace::WorkspaceDescriptor::load(Path::new(&record.descriptor))
        .map(|d| d.workspace.deep_scan)
        .unwrap_or(false);

    let snap = republish_with(
        &wh, &idx, &git_dir, &index_dir, &members, ws_name, deep_scan, changed,
    )?;

    record.current_snapshot = snap.clone();
    record.updated_at = chrono::Utc::now().to_rfc3339();
    reg.upsert(&record)?;
    Ok(snap)
}

/// Republish a monitored workspace **through caller-owned handles** — the warehouse
/// + Tantivy index the `nornir-server` already holds open (redb is single-writer,
/// so the server must be the sole opener; the poll loop writes through these same
/// handles rather than opening its own). Runs the producer pipeline over the
/// materialized member checkouts under `git_dir` and returns the latest snapshot
/// id. Members without a `.git` checkout (e.g. SSH-only, not yet clonable) are
/// skipped. Errors when none are present.
///
/// **Incremental (F1):** when `changed` names the members that moved, the dep
/// graph is built *up front* and the index rebuild is scoped to
/// [`affected_set`] (`changed ∪ transitive dependents`); members outside that
/// set are not re-walked. `changed = None` or an unbuildable graph fall back to
/// a full rebuild over every present member.
pub fn republish_with(
    wh: &crate::warehouse::iceberg::IcebergWarehouse,
    idx: &crate::index::Index,
    git_dir: &Path,
    index_dir: &Path,
    members: &[String],
    ws_name: &str,
    deep_scan: bool,
    changed: Option<&[String]>,
) -> Result<String> {
    let present: Vec<&String> = members
        .iter()
        .filter(|name| git_dir.join(name).join(".git").exists())
        .collect();
    if present.is_empty() {
        bail!(
            "no materialized member checkouts under {} (SSH-only members can't be \
             cloned yet)",
            git_dir.display()
        );
    }

    // Cross-repo dep graph (cheap, offline `cargo metadata --no-deps`). Built up
    // front so the affected-set scoping below can consult it; reused for the
    // step-4 persist. Best-effort — a graph hiccup just means a full rebuild.
    let graph = match build_member_graph(ws_name, git_dir, &present) {
        Ok(g) => Some(g),
        Err(e) => {
            eprintln!("nornir-monitor: {ws_name} dep-graph build skipped: {e:#}");
            None
        }
    };

    // Affected set = changed ∪ transitive dependents. `None` (first publish /
    // empty warehouse) or a missing graph → empty scope = full rebuild.
    let scope: Vec<String> = match (changed, graph.as_ref()) {
        (Some(ch), Some(g)) if !ch.is_empty() => affected_set(g, ch, &present),
        _ => Vec::new(),
    };
    if scope.is_empty() {
        eprintln!("nornir-monitor: {ws_name} → full rebuild over all members");
    } else {
        eprintln!(
            "nornir-monitor: {ws_name} → incremental rebuild, affected [{}]",
            scope.join(", ")
        );
    }

    // 1. Knowledge scan (symbols + git-heat) per member → persist.
    //    **Incremental by git-SHA idempotency**: a member whose HEAD SHA was
    //    already scanned is skipped. Members are scanned serially — the scan
    //    itself is now internally all-cores (`symbols::scan_repo` parses every
    //    file in parallel), so per-repo speed makes cross-member parallelism
    //    unnecessary (and keeps the single-writer warehouse appends simple).
    use std::time::Instant;
    let t_scan = Instant::now();
    let mut n_scan = 0usize;
    for name in &present {
        let repo_root = git_dir.join(name.as_str());
        let (sha, _branch) = crate::gitio::head_sha_and_branch(&repo_root)
            .unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
        // Skip-on-unchanged is keyed only on the git SHA ledger, so a member that
        // was scanned by an OLDER binary (before a new knowledge table existed) has
        // a ledger row but NO rows in that new table — and the skip would prevent
        // ever backfilling it. Guard the skip on the `test_inventory` (build-free
        // test surface) being present too: when the knowledge scan exists BUT the
        // inventory is empty, fall through and re-scan to backfill it. This is the
        // fix for the 🧪 Test pane showing "no tests discovered" for members last
        // scanned before `test_inventory` shipped. Read is cheap + best-effort.
        let scan_exists = wh.knowledge_scan_exists(name, &sha).unwrap_or(false);
        let inventory_present = scan_exists
            && crate::warehouse::test_inventory::query_test_inventory(wh, name.as_str())
                .map(|rows| !rows.is_empty())
                .unwrap_or(false);
        if should_skip_member_scan(scan_exists, inventory_present) {
            eprintln!("nornir-monitor: {ws_name}/{name} @ {} — knowledge unchanged, skip scan", &sha[..sha.len().min(8)]);
            continue;
        }
        if scan_exists {
            eprintln!(
                "nornir-monitor: {ws_name}/{name} @ {} — knowledge unchanged but test_inventory empty → re-scan to backfill",
                &sha[..sha.len().min(8)]
            );
        }
        n_scan += 1;
        let res = match crate::knowledge::scan_all(&repo_root, name) {
            Ok(r) => r,
            Err(e) => {
                eprintln!("nornir-monitor: {ws_name}/{name} knowledge scan failed: {e:#}");
                continue;
            }
        };
        wh.append_symbol_scan(&res.symbols)?;
        wh.append_git_heat_scan(&res.git)?;
        if let Err(e) = wh.record_knowledge_scan(name, &sha, res.symbols.snapshot_id) {
            eprintln!("nornir-monitor: {ws_name}/{name} knowledge-scan ledger skipped: {e:#}");
        }
        if let Err(e) = crate::security::warm(wh, &repo_root, Some(git_dir)) {
            eprintln!("nornir-monitor: {ws_name}/{name} security warm skipped: {e:#}");
        }
    }
    eprintln!(
        "nornir-monitor: {ws_name} ⏱ knowledge-scan {n_scan} member(s) in {:.1}s",
        t_scan.elapsed().as_secs_f64()
    );

    // 2. (Re)build the workspace code index. Incrementally scoped to the
    //    affected members when `scope` is non-empty; full-workspace otherwise.
    let t_index = Instant::now();
    idx.build_scoped(&scope).context("build workspace index")?;
    eprintln!("nornir-monitor: {ws_name} ⏱ index-build in {:.1}s", t_index.elapsed().as_secs_f64());

    // 3. Snapshot the index into Iceberg, keyed to each member's HEAD.
    let t_snap = Instant::now();
    let mut last_snapshot = String::new();
    for name in &present {
        let repo_root = git_dir.join(name);
        let (sha, branch) = crate::gitio::head_sha_and_branch(&repo_root)
            .unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
        let snap = crate::index::snapshot::snapshot_to_iceberg(
            wh, ws_name, name, &sha, &branch, index_dir,
        )
        .with_context(|| format!("snapshot {name}"))?;
        last_snapshot = snap.snapshot_id.to_string();
    }
    eprintln!(
        "nornir-monitor: {ws_name} ⏱ snapshot {} member(s) in {:.1}s",
        present.len(),
        t_snap.elapsed().as_secs_f64()
    );

    // 4. Persist the cross-repo dependency graph built up front (step 0) so the
    //    viz Dep Graph tab reflects the monitored sources. Note: this graph is
    //    *cross-repo*, so a single-member workspace yields no edges; it lights up
    //    for multi-repo monitored workspaces.
    if let Some(graph) = graph.as_ref() {
        if let Err(e) = wh.record_dep_graph(ws_name, graph) {
            eprintln!("nornir-monitor: {ws_name} dep-graph persist skipped: {e:#}");
        }
    }

    // 5. Opt-in deep-scan: knowledge-scan the members' entire transitive dep
    //    closure (gatling-fanned) into this same warehouse under repo `deps`.
    //    Best-effort: needs `cargo` on PATH + network for the first fetch.
    if deep_scan {
        let t_deep = Instant::now();
        let dirs: Vec<std::path::PathBuf> =
            present.iter().map(|n| git_dir.join(n.as_str())).collect();
        match crate::deepscan::deep_scan(&dirs, wh) {
            Ok(rep) => {
                eprintln!(
                    "nornir-monitor: {ws_name} ⏱ deep-scan: {} dep crate(s) scanned, {} skipped, {} error(s) in {:.1}s",
                    rep.crates,
                    rep.skipped,
                    rep.errors.len(),
                    t_deep.elapsed().as_secs_f64(),
                );
                for e in rep.errors.iter().take(5) {
                    eprintln!("nornir-monitor:   deep-scan error: {e}");
                }
            }
            Err(e) => eprintln!("nornir-monitor: {ws_name} deep-scan skipped: {e:#}"),
        }
    }

    // 6. Test surface (populate): discovery is now BUILD-FREE — step 1's syn scan
    //    already wrote one `test_inventory` row per `#[test]` (no `cargo build`,
    //    no `cargo test --list`). The 🧪 Test pane shows EVERY discovered test as
    //    `X` (not-run) the instant the workspace is added; `ok`/`fail` fills in only
    //    once `nornir release` / `nornir test run` EXECUTES them (never here, never
    //    in the poll loop). This step just logs the per-member inventory count to
    //    `release_events`. Opt out of the LOGGING with NORNIR_POPULATE_SKIP_TEST=1
    //    (discovery itself still happens — it's free, part of the knowledge scan).
    //    Best-effort: never fails republish.
    if std::env::var_os("NORNIR_POPULATE_SKIP_TEST").is_none() {
        let to_scan = members_to_populate(&present, &scope);
        if let Err(e) = discover_test_surface(wh, git_dir, ws_name, &to_scan) {
            eprintln!("nornir-monitor: {ws_name} test discovery skipped: {e:#}");
        }
    }

    Ok(last_snapshot)
}

/// Skip-on-unchanged decision for a member's knowledge scan. The git-SHA ledger
/// (`scan_exists`) alone is NOT enough: a member scanned by an OLDER binary — one
/// that predates a knowledge table such as `test_inventory` — has a ledger row but
/// NO rows in that table, and skipping on the ledger alone would prevent ever
/// backfilling it (the 🧪 Test pane shows "no tests discovered"). So we only skip
/// when the SHA was scanned AND its `test_inventory` is present; an empty inventory
/// at a recorded SHA falls through to a re-scan that backfills it.
fn should_skip_member_scan(scan_exists: bool, inventory_present: bool) -> bool {
    scan_exists && inventory_present
}

/// The members a populate step should cover: the full `present` set on a full
/// rebuild (empty `scope`), else just the affected members in `scope`. Pure +
/// testable — the bound that keeps poll-loop populate work small (a poll tick
/// usually moves 0–1 members, so only those get re-discovered/re-tested).
fn members_to_populate<'a>(present: &[&'a String], scope: &[String]) -> Vec<&'a String> {
    if scope.is_empty() {
        present.to_vec()
    } else {
        present
            .iter()
            .copied()
            .filter(|n| scope.iter().any(|s| s == *n))
            .collect()
    }
}

/// Populate **step 6** — log the per-member test-discovery summary. Discovery
/// itself is now **build-free**: it rode the step-1 syn knowledge scan, which
/// wrote one [`test_inventory`](crate::warehouse::test_inventory) row per
/// `#[test]` (no `cargo build`, no `cargo test --list`). This step just reads
/// that inventory back per member and records the count to `release_events`
/// (`op = test_discover`), so a freshly populated workspace's 🧪 Test pane shows
/// every test as `X` (not-run) the instant it's added — the matrix join supplies
/// the `X` for any inventory test with no `test_results` row, so we no longer
/// seed `listed` rows here. EXECUTION (which fills ok/fail) stays gated to
/// `nornir release` / `nornir test run` and NEVER happens on this path.
/// Best-effort: a per-member read/log error is logged and skipped, never fatal.
fn discover_test_surface(
    wh: &crate::warehouse::iceberg::IcebergWarehouse,
    _git_dir: &Path,
    ws_name: &str,
    members: &[&String],
) -> Result<()> {
    use crate::warehouse::release_events::{append_release_events, phase, status, ReleaseEventRow};
    use crate::warehouse::test_inventory::query_test_inventory;
    use crate::warehouse::test_results::new_run_id;

    if members.is_empty() {
        return Ok(());
    }
    let run_id = new_run_id();
    let ts = chrono::Utc::now().timestamp_micros();
    let mut seq: i64 = 0;
    let mut log = |repo: &str, st: &str, detail: String| {
        let row = ReleaseEventRow {
            run_id: run_id.clone(),
            seq,
            ts_micros: ts,
            component: ws_name.to_string(),
            repo: repo.to_string(),
            op: "test_discover".into(),
            phase: phase::END.into(),
            status: st.into(),
            detail,
            depends_on: None,
            elapsed_ms: None,
        };
        seq += 1;
        // Best-effort: a logging-row failure must not abort populate.
        let _ = wh.block_on(append_release_events(wh, std::slice::from_ref(&row)));
    };

    for name in members {
        // The inventory was written by step 1's syn scan — read it back (NO build).
        match query_test_inventory(wh, name.as_str()) {
            Ok(inv) => {
                let heavy = inv.iter().filter(|t| t.is_heavy).count();
                eprintln!(
                    "nornir-monitor: {ws_name}/{name}{} test(s) inventoried via syn scan ({heavy} heavy) (NOT RUN, X)",
                    inv.len()
                );
                log(name, status::OK, format!("{} tests discovered ({heavy} heavy), build-free", inv.len()));
            }
            Err(e) => {
                eprintln!("nornir-monitor: {ws_name}/{name} test inventory read skipped: {e:#}");
                log(name, status::WARN, format!("inventory read skipped: {e}"));
            }
        }
    }
    Ok(())
}

/// The **affected set** for an incremental republish: the members whose HEAD
/// moved (`changed`) ∪ their transitive dependents, per the cross-repo dep
/// [`graph`](crate::warehouse::dep_graph::WorkspaceGraph). The result is
/// intersected with `present` (the materialized member checkouts) — a changed
/// member that has no checkout, or a graph node that isn't a workspace member,
/// is dropped so the index scope only ever names real member directories.
///
/// Returns an empty vec only when nothing in `changed` resolves to a present
/// member; callers treat an empty scope as "full rebuild", so this stays a
/// safe over-approximation rather than silently indexing nothing.
fn affected_set(
    graph: &crate::warehouse::dep_graph::WorkspaceGraph,
    changed: &[String],
    present: &[&String],
) -> Vec<String> {
    let present_set: std::collections::BTreeSet<&str> =
        present.iter().map(|s| s.as_str()).collect();
    graph
        .affected_by_change(changed)
        .into_iter()
        .filter(|m| present_set.contains(m.as_str()))
        .collect()
}

/// Build a cross-repo [`WorkspaceGraph`] over the materialized member checkouts
/// by synthesizing a path-based descriptor (so it resolves to `git_dir/<member>`,
/// not the git cache). `cargo metadata --no-deps` → no network.
fn build_member_graph(
    ws_name: &str,
    git_dir: &Path,
    members: &[&String],
) -> Result<crate::warehouse::dep_graph::WorkspaceGraph> {
    use crate::workspace::descriptor::{RepoSpec, WorkspaceDescriptor, WorkspaceMeta};
    let mut repos = std::collections::BTreeMap::new();
    for name in members {
        repos.insert(
            (*name).clone(),
            RepoSpec {
                path: Some(git_dir.join(name).to_string_lossy().into_owned()),
                git: None,
                branch: None,
            },
        );
    }
    let desc = WorkspaceDescriptor {
        workspace: WorkspaceMeta { name: ws_name.to_string(), deep_scan: false },
        repos,
        descriptor_dir: git_dir.to_path_buf(),
    };
    crate::warehouse::dep_graph::WorkspaceGraph::build(&desc)
}

/// Parse `"60s"`, `"5m"`, `"2h"`, or a bare seconds count; fall back to `default`.
pub fn parse_interval(s: &str, default: Duration) -> Duration {
    let s = s.trim();
    if s.is_empty() {
        return default;
    }
    let (num, mult) = match s.chars().last() {
        Some('s') => (&s[..s.len() - 1], 1),
        Some('m') => (&s[..s.len() - 1], 60),
        Some('h') => (&s[..s.len() - 1], 3600),
        _ => (s, 1),
    };
    num.trim()
        .parse::<u64>()
        .ok()
        .map(|n| Duration::from_secs(n.saturating_mul(mult)))
        .unwrap_or(default)
}

/// Spawn the background poll loop over the server's shared registry. Fetches run
/// on a blocking thread so gix I/O never stalls the async runtime. Returns the
/// join handle immediately.
pub fn spawn_poll_loop(
    reg: Arc<Registry>,
    root: PathBuf,
    tick: Duration,
) -> tokio::task::JoinHandle<()> {
    tokio::spawn(async move {
        let mut interval = tokio::time::interval(tick);
        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
        loop {
            interval.tick().await;
            let reg = reg.clone();
            let root = root.clone();
            match tokio::task::spawn_blocking(move || sync_once(&reg, &root)).await {
                Ok(Ok(())) => {}
                Ok(Err(e)) => eprintln!("nornir-monitor: sweep error: {e:#}"),
                Err(e) => eprintln!("nornir-monitor: sweep task panicked: {e}"),
            }
        }
    })
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::warehouse::dep_graph::{CrossRepoEdge, RepoFacts, WorkspaceGraph};
    use std::collections::{BTreeMap, BTreeSet};

    fn facts(name: &str, produces: &[&str], consumes: &[&str]) -> RepoFacts {
        RepoFacts {
            name: name.to_string(),
            root: std::path::PathBuf::from("/dev/null"),
            produces: produces.iter().map(|s| s.to_string()).collect(),
            consumes: consumes.iter().map(|s| s.to_string()).collect(),
        }
    }
    fn edge(from: &str, to: &str, via: &[&str]) -> CrossRepoEdge {
        CrossRepoEdge {
            from: from.to_string(),
            to: to.to_string(),
            via: via.iter().map(|s| s.to_string()).collect(),
        }
    }

    /// Diamond dep graph: app → liba → util, app → libb → util.
    /// Edges point consumer → producer. A change to `util` should blast up to
    /// liba, libb, app; a change to `liba` should reach only app (not libb/util).
    fn diamond() -> WorkspaceGraph {
        let mut fmap = BTreeMap::new();
        for f in [
            facts("app", &["app_c"], &["a_c", "b_c"]),
            facts("liba", &["a_c"], &["util_c"]),
            facts("libb", &["b_c"], &["util_c"]),
            facts("util", &["util_c"], &[]),
        ] {
            fmap.insert(f.name.clone(), f);
        }
        WorkspaceGraph::from_query_parts(
            fmap,
            vec![
                edge("app", "liba", &["a_c"]),
                edge("app", "libb", &["b_c"]),
                edge("liba", "util", &["util_c"]),
                edge("libb", "util", &["util_c"]),
            ],
        )
    }

    /// F1 inject-assert: feed a fake change-set + a real dep graph, assert the
    /// affected set is exactly `changed ∪ transitive dependents`, and that an
    /// unaffected member (one that nothing changed depends on) is *skipped*.
    #[test]
    fn affected_set_is_changed_union_dependents_and_skips_unaffected() {
        let g = diamond();
        // All four members have a materialized checkout.
        let all = ["app".to_string(), "liba".to_string(), "libb".to_string(), "util".to_string()];
        let present: Vec<&String> = all.iter().collect();

        // A change to `liba` only. Affected = liba ∪ its dependents = {liba, app}.
        let affected = affected_set(&g, &["liba".to_string()], &present);
        let got: BTreeSet<&str> = affected.iter().map(|s| s.as_str()).collect();
        assert_eq!(
            got,
            ["liba", "app"].into_iter().collect::<BTreeSet<_>>(),
            "affected set must be changed ∪ transitive dependents"
        );
        // `libb` and `util` are unaffected by a change to `liba` → skipped.
        assert!(!got.contains("libb"), "sibling libb must be skipped — it does not depend on liba");
        assert!(!got.contains("util"), "dependency util must be skipped — liba depends on it, not vice-versa");
        // Build order: a dependency must precede its dependent (liba before app).
        let pos = |n: &str| affected.iter().position(|x| x == n).unwrap();
        assert!(pos("liba") < pos("app"), "affected set must come back in build order");

        // A change to the root `util` blasts up the whole diamond.
        let blast = affected_set(&g, &["util".to_string()], &present);
        let blast_set: BTreeSet<&str> = blast.iter().map(|s| s.as_str()).collect();
        assert_eq!(
            blast_set,
            ["app", "liba", "libb", "util"].into_iter().collect::<BTreeSet<_>>(),
            "a change to the shared leaf must rebuild every member"
        );

        // A changed member with no materialized checkout is dropped from the scope
        // (intersection with `present`), so the index never names a missing dir.
        let only_app = vec![&all[0]]; // present = {app}
        let scoped = affected_set(&g, &["liba".to_string()], &only_app);
        assert_eq!(
            scoped,
            vec!["app".to_string()],
            "affected set is intersected with present checkouts — absent `liba` is dropped"
        );
    }

    #[test]
    fn populate_scope_is_all_on_full_rebuild_else_just_affected() {
        let a = "a".to_string();
        let b = "b".to_string();
        let c = "c".to_string();
        let present = vec![&a, &b, &c];
        // Full rebuild (empty scope) → every present member gets discovered/tested.
        assert_eq!(members_to_populate(&present, &[]), vec![&a, &b, &c]);
        // Incremental (a poll tick moved `b`) → only `b`; an absent name is dropped.
        let scope = vec!["b".to_string(), "ghost".to_string()];
        assert_eq!(members_to_populate(&present, &scope), vec![&b]);
        // No present members → nothing, never a panic.
        assert!(members_to_populate(&[], &scope).is_empty());
    }

    /// Inject-assert: the skip-on-unchanged decision is `scan_exists ∧ inventory_present`.
    /// The load-bearing case is the third one — a member scanned by an OLD binary has the
    /// SHA ledger row but an EMPTY `test_inventory`, and MUST NOT be skipped (else it never
    /// backfills and `nornir test list` shows "no tests discovered").
    #[test]
    fn skip_only_when_scanned_and_inventory_present() {
        // Fresh SHA (never scanned) → must scan.
        assert!(!should_skip_member_scan(false, false), "unseen SHA must be scanned");
        // SHA scanned by the CURRENT binary, inventory written → skip (the steady state).
        assert!(should_skip_member_scan(true, true), "scanned SHA with inventory → skip");
        // SHA scanned by an OLD binary: ledger row present but inventory EMPTY → re-scan
        // to backfill. This is the bug fix — the skip must NOT fire here.
        assert!(
            !should_skip_member_scan(true, false),
            "scanned SHA with EMPTY inventory must re-scan to backfill the new table"
        );
    }

    /// End-to-end inject-assert against a real warehouse: simulate the OLD-binary state
    /// (a `knowledge_scans` ledger row at a SHA, but NO `test_inventory` rows), assert the
    /// monitor decision falls through to re-scan, then write inventory (as the re-scan
    /// would) and assert the decision flips to skip. Proves the backfill actually populates.
    #[test]
    fn empty_inventory_at_recorded_sha_backfills_then_skips() {
        use crate::knowledge::symbols::{SymbolScan, TestDefRow};
        use crate::warehouse::iceberg::IcebergWarehouse;
        use crate::warehouse::test_inventory::query_test_inventory;

        let dir = tempfile::tempdir().unwrap();
        let wh = IcebergWarehouse::open(dir.path()).unwrap();
        let repo = "holger";
        let sha = "deadbeefcafef00d";

        // OLD-binary state: the SHA is in the knowledge-scan ledger, but the
        // test_inventory table has no rows for this repo (the table didn't exist
        // when this SHA was scanned).
        wh.record_knowledge_scan(repo, sha, uuid::Uuid::new_v4()).unwrap();
        let scan_exists = wh.knowledge_scan_exists(repo, sha).unwrap();
        let inv0 = query_test_inventory(&wh, repo).unwrap();
        assert!(scan_exists, "ledger row must be present (simulating an old scan)");
        assert!(inv0.is_empty(), "inventory must start empty (old binary wrote none)");
        assert!(
            !should_skip_member_scan(scan_exists, !inv0.is_empty()),
            "empty inventory at a recorded SHA must NOT skip — it must backfill"
        );

        // The re-scan (step 1) would append a SymbolScan carrying test rows. Emulate
        // that write, then assert the inventory is now non-empty and the decision flips.
        let mut scan = SymbolScan {
            snapshot_id: uuid::Uuid::new_v4(),
            ts: chrono::Utc::now(),
            repo: repo.to_string(),
            ..Default::default()
        };
        scan.tests.push(TestDefRow {
            crate_name: repo.to_string(),
            module_path: format!("{repo}::tests"),
            test_name: "it_works".into(),
            file: "src/lib.rs".into(),
            line: 42,
            is_heavy: false,
            is_async: false,
        });
        wh.append_symbol_scan(&scan).unwrap();

        let inv1 = query_test_inventory(&wh, repo).unwrap();
        assert_eq!(inv1.len(), 1, "backfill must write the discovered test row");
        assert_eq!(inv1[0].test_name, "it_works");
        assert!(
            should_skip_member_scan(scan_exists, !inv1.is_empty()),
            "once inventory is backfilled, the SHA is skipped again"
        );
    }

    #[test]
    fn interval_parsing() {
        let d = Duration::from_secs(60);
        assert_eq!(parse_interval("", d), Duration::from_secs(60));
        assert_eq!(parse_interval("30s", d), Duration::from_secs(30));
        assert_eq!(parse_interval("5m", d), Duration::from_secs(300));
        assert_eq!(parse_interval("2h", d), Duration::from_secs(7200));
        assert_eq!(parse_interval("90", d), Duration::from_secs(90));
        assert_eq!(parse_interval("garbage", d), Duration::from_secs(60));
    }
}