use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result};
use crate::registry::{Mode, Registry};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MemberOutcome {
pub member: String,
pub remote: String,
pub op: String,
pub status: String,
pub detail: String,
pub elapsed_ms: i64,
}
#[derive(Debug, Default)]
pub struct FetchReport {
pub workspace: String,
pub fetched: usize,
pub changed: Vec<String>,
pub errors: Vec<(String, String)>,
pub outcomes: Vec<MemberOutcome>,
}
impl FetchReport {
pub fn changed_any(&self) -> bool {
!self.changed.is_empty()
}
}
pub fn fetch_workspace(reg: &Registry, root: &Path, name: &str) -> Result<FetchReport> {
let mut ws = reg
.get(name)?
.ok_or_else(|| anyhow!("no workspace `{name}` in the registry"))?;
let ws_root = root.join(&ws.name);
let git_dir = ws_root.join("git");
std::fs::create_dir_all(&git_dir)?;
std::fs::create_dir_all(ws_root.join("builds"))?;
let now = chrono::Utc::now().to_rfc3339();
let mut report = FetchReport {
workspace: ws.name.clone(),
..Default::default()
};
for m in &mut ws.members {
if m.remote.is_empty() {
continue; }
report.fetched += 1;
let dest = git_dir.join(&m.name);
let t0 = std::time::Instant::now();
let outcome = crate::gitio::clone_or_fetch(&m.remote, &dest, None);
let elapsed_ms = t0.elapsed().as_millis() as i64;
match outcome {
Ok(sha) => {
report.outcomes.push(MemberOutcome {
member: m.name.clone(),
remote: m.remote.clone(),
op: "clone-fetch".into(),
status: "ok".into(),
detail: sha.clone(),
elapsed_ms,
});
if sha != m.last_seen_sha {
report.changed.push(m.name.clone());
}
m.last_seen_sha = sha;
m.last_synced = now.clone();
m.sync_state = "ok".into();
match crate::gitio::worktree_freshness(&dest) {
Ok(f) => {
m.worktree_digest = f.digest;
m.worktree_dirty = f.dirty;
}
Err(e) => eprintln!(
"nornir-monitor: {}::{} worktree freshness skipped: {e:#}",
report.workspace, m.name
),
}
}
Err(e) => {
let msg = format!("{e:#}");
m.sync_state = format!("error: {msg}");
report.outcomes.push(MemberOutcome {
member: m.name.clone(),
remote: m.remote.clone(),
op: "clone-fetch".into(),
status: "error".into(),
detail: msg.clone(),
elapsed_ms,
});
report.errors.push((m.name.clone(), msg));
}
}
}
ws.updated_at = now;
reg.upsert(&ws)?;
Ok(report)
}
pub fn sync_once(reg: &Registry, root: &Path) -> Result<()> {
for ws in reg.list()? {
if ws.mode != Mode::Monitored {
continue;
}
match fetch_workspace(®, root, &ws.name) {
Ok(rep) => {
for (m, e) in &rep.errors {
eprintln!("nornir-monitor: {}::{m} fetch error: {e}", rep.workspace);
}
let needs_build = reg
.get(&ws.name)
.ok()
.flatten()
.map(|r| r.current_snapshot.trim().is_empty())
.unwrap_or(false);
if rep.changed_any() || needs_build {
let why = if rep.changed_any() {
format!("changed [{}]", rep.changed.join(", "))
} else {
"empty warehouse".to_string()
};
eprintln!("nornir-monitor: {} {why} → republish", rep.workspace);
let changed: Option<&[String]> =
if rep.changed_any() { Some(&rep.changed) } else { None };
match republish(reg, root, &ws.name, changed) {
Ok(snap) => eprintln!(
"nornir-monitor: {} republished → snapshot {snap}",
ws.name
),
Err(e) => {
eprintln!("nornir-monitor: {} republish failed: {e:#}", ws.name)
}
}
}
}
Err(e) => eprintln!("nornir-monitor: {} fetch failed: {e:#}", ws.name),
}
}
Ok(())
}
pub fn republish(
reg: &Registry,
root: &Path,
ws_name: &str,
changed: Option<&[String]>,
) -> Result<String> {
use crate::warehouse::iceberg::IcebergWarehouse;
let ws_root = root.join(ws_name);
let git_dir = ws_root.join("git");
let builds = ws_root.join("builds");
std::fs::create_dir_all(&builds)?;
let mut record = reg
.get(ws_name)?
.ok_or_else(|| anyhow!("workspace `{ws_name}` vanished from the registry"))?;
let members: Vec<String> = record.members.iter().map(|m| m.name.clone()).collect();
let index_dir = builds.join("cache").join("index");
let wh = IcebergWarehouse::open(&builds)
.with_context(|| format!("open warehouse {}", builds.display()))?;
let idx = crate::index::Index::open_at(&git_dir, &index_dir)?
.with_repo_scope(members.clone());
let deep_scan = crate::workspace::WorkspaceDescriptor::load(Path::new(&record.descriptor))
.map(|d| d.workspace.deep_scan)
.unwrap_or(false);
let snap = republish_with(
&wh, &idx, &git_dir, &index_dir, &members, ws_name, deep_scan, changed,
)?;
record.current_snapshot = snap.clone();
record.updated_at = chrono::Utc::now().to_rfc3339();
reg.upsert(&record)?;
Ok(snap)
}
pub fn republish_with(
wh: &crate::warehouse::iceberg::IcebergWarehouse,
idx: &crate::index::Index,
git_dir: &Path,
index_dir: &Path,
members: &[String],
ws_name: &str,
deep_scan: bool,
changed: Option<&[String]>,
) -> Result<String> {
let present: Vec<&String> = members
.iter()
.filter(|name| git_dir.join(name).join(".git").exists())
.collect();
if present.is_empty() {
bail!(
"no materialized member checkouts under {} (SSH-only members can't be \
cloned yet)",
git_dir.display()
);
}
let graph = match build_member_graph(ws_name, git_dir, &present) {
Ok(g) => Some(g),
Err(e) => {
eprintln!("nornir-monitor: {ws_name} dep-graph build skipped: {e:#}");
None
}
};
let scope: Vec<String> = match (changed, graph.as_ref()) {
(Some(ch), Some(g)) if !ch.is_empty() => affected_set(g, ch, &present),
_ => Vec::new(),
};
if scope.is_empty() {
eprintln!("nornir-monitor: {ws_name} → full rebuild over all members");
} else {
eprintln!(
"nornir-monitor: {ws_name} → incremental rebuild, affected [{}]",
scope.join(", ")
);
}
use std::time::Instant;
let t_scan = Instant::now();
let mut n_scan = 0usize;
for name in &present {
let repo_root = git_dir.join(name.as_str());
let (sha, _branch) = crate::gitio::head_sha_and_branch(&repo_root)
.unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
let scan_exists = wh.knowledge_scan_exists(name, &sha).unwrap_or(false);
let inventory_present = scan_exists
&& crate::warehouse::test_inventory::query_test_inventory(wh, name.as_str())
.map(|rows| !rows.is_empty())
.unwrap_or(false);
if should_skip_member_scan(scan_exists, inventory_present) {
eprintln!("nornir-monitor: {ws_name}/{name} @ {} — knowledge unchanged, skip scan", &sha[..sha.len().min(8)]);
continue;
}
if scan_exists {
eprintln!(
"nornir-monitor: {ws_name}/{name} @ {} — knowledge unchanged but test_inventory empty → re-scan to backfill",
&sha[..sha.len().min(8)]
);
}
n_scan += 1;
let res = match crate::knowledge::scan_all(&repo_root, name) {
Ok(r) => r,
Err(e) => {
eprintln!("nornir-monitor: {ws_name}/{name} knowledge scan failed: {e:#}");
continue;
}
};
wh.append_symbol_scan(&res.symbols)?;
wh.append_git_heat_scan(&res.git)?;
if let Err(e) = wh.record_knowledge_scan(name, &sha, res.symbols.snapshot_id) {
eprintln!("nornir-monitor: {ws_name}/{name} knowledge-scan ledger skipped: {e:#}");
}
if let Err(e) = crate::security::warm(wh, &repo_root, Some(git_dir)) {
eprintln!("nornir-monitor: {ws_name}/{name} security warm skipped: {e:#}");
}
}
eprintln!(
"nornir-monitor: {ws_name} ⏱ knowledge-scan {n_scan} member(s) in {:.1}s",
t_scan.elapsed().as_secs_f64()
);
let t_index = Instant::now();
idx.build_scoped(&scope).context("build workspace index")?;
eprintln!("nornir-monitor: {ws_name} ⏱ index-build in {:.1}s", t_index.elapsed().as_secs_f64());
let t_snap = Instant::now();
let mut last_snapshot = String::new();
for name in &present {
let repo_root = git_dir.join(name);
let (sha, branch) = crate::gitio::head_sha_and_branch(&repo_root)
.unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
let snap = crate::index::snapshot::snapshot_to_iceberg(
wh, ws_name, name, &sha, &branch, index_dir,
)
.with_context(|| format!("snapshot {name}"))?;
last_snapshot = snap.snapshot_id.to_string();
}
eprintln!(
"nornir-monitor: {ws_name} ⏱ snapshot {} member(s) in {:.1}s",
present.len(),
t_snap.elapsed().as_secs_f64()
);
if let Some(graph) = graph.as_ref() {
if let Err(e) = wh.record_dep_graph(ws_name, graph) {
eprintln!("nornir-monitor: {ws_name} dep-graph persist skipped: {e:#}");
}
}
if deep_scan {
let t_deep = Instant::now();
let dirs: Vec<std::path::PathBuf> =
present.iter().map(|n| git_dir.join(n.as_str())).collect();
match crate::deepscan::deep_scan(&dirs, wh) {
Ok(rep) => {
eprintln!(
"nornir-monitor: {ws_name} ⏱ deep-scan: {} dep crate(s) scanned, {} skipped, {} error(s) in {:.1}s",
rep.crates,
rep.skipped,
rep.errors.len(),
t_deep.elapsed().as_secs_f64(),
);
for e in rep.errors.iter().take(5) {
eprintln!("nornir-monitor: deep-scan error: {e}");
}
}
Err(e) => eprintln!("nornir-monitor: {ws_name} deep-scan skipped: {e:#}"),
}
}
if std::env::var_os("NORNIR_POPULATE_SKIP_TEST").is_none() {
let to_scan = members_to_populate(&present, &scope);
if let Err(e) = discover_test_surface(wh, git_dir, ws_name, &to_scan) {
eprintln!("nornir-monitor: {ws_name} test discovery skipped: {e:#}");
}
}
Ok(last_snapshot)
}
fn should_skip_member_scan(scan_exists: bool, inventory_present: bool) -> bool {
scan_exists && inventory_present
}
fn members_to_populate<'a>(present: &[&'a String], scope: &[String]) -> Vec<&'a String> {
if scope.is_empty() {
present.to_vec()
} else {
present
.iter()
.copied()
.filter(|n| scope.iter().any(|s| s == *n))
.collect()
}
}
fn discover_test_surface(
wh: &crate::warehouse::iceberg::IcebergWarehouse,
_git_dir: &Path,
ws_name: &str,
members: &[&String],
) -> Result<()> {
use crate::warehouse::release_events::{append_release_events, phase, status, ReleaseEventRow};
use crate::warehouse::test_inventory::query_test_inventory;
use crate::warehouse::test_results::new_run_id;
if members.is_empty() {
return Ok(());
}
let run_id = new_run_id();
let ts = chrono::Utc::now().timestamp_micros();
let mut seq: i64 = 0;
let mut log = |repo: &str, st: &str, detail: String| {
let row = ReleaseEventRow {
run_id: run_id.clone(),
seq,
ts_micros: ts,
component: ws_name.to_string(),
repo: repo.to_string(),
op: "test_discover".into(),
phase: phase::END.into(),
status: st.into(),
detail,
depends_on: None,
elapsed_ms: None,
};
seq += 1;
let _ = wh.block_on(append_release_events(wh, std::slice::from_ref(&row)));
};
for name in members {
match query_test_inventory(wh, name.as_str()) {
Ok(inv) => {
let heavy = inv.iter().filter(|t| t.is_heavy).count();
eprintln!(
"nornir-monitor: {ws_name}/{name} — {} test(s) inventoried via syn scan ({heavy} heavy) (NOT RUN, X)",
inv.len()
);
log(name, status::OK, format!("{} tests discovered ({heavy} heavy), build-free", inv.len()));
}
Err(e) => {
eprintln!("nornir-monitor: {ws_name}/{name} test inventory read skipped: {e:#}");
log(name, status::WARN, format!("inventory read skipped: {e}"));
}
}
}
Ok(())
}
fn affected_set(
graph: &crate::warehouse::dep_graph::WorkspaceGraph,
changed: &[String],
present: &[&String],
) -> Vec<String> {
let present_set: std::collections::BTreeSet<&str> =
present.iter().map(|s| s.as_str()).collect();
graph
.affected_by_change(changed)
.into_iter()
.filter(|m| present_set.contains(m.as_str()))
.collect()
}
fn build_member_graph(
ws_name: &str,
git_dir: &Path,
members: &[&String],
) -> Result<crate::warehouse::dep_graph::WorkspaceGraph> {
use crate::workspace::descriptor::{RepoSpec, WorkspaceDescriptor, WorkspaceMeta};
let mut repos = std::collections::BTreeMap::new();
for name in members {
repos.insert(
(*name).clone(),
RepoSpec {
path: Some(git_dir.join(name).to_string_lossy().into_owned()),
git: None,
branch: None,
},
);
}
let desc = WorkspaceDescriptor {
workspace: WorkspaceMeta { name: ws_name.to_string(), deep_scan: false },
repos,
descriptor_dir: git_dir.to_path_buf(),
};
crate::warehouse::dep_graph::WorkspaceGraph::build(&desc)
}
pub fn parse_interval(s: &str, default: Duration) -> Duration {
let s = s.trim();
if s.is_empty() {
return default;
}
let (num, mult) = match s.chars().last() {
Some('s') => (&s[..s.len() - 1], 1),
Some('m') => (&s[..s.len() - 1], 60),
Some('h') => (&s[..s.len() - 1], 3600),
_ => (s, 1),
};
num.trim()
.parse::<u64>()
.ok()
.map(|n| Duration::from_secs(n.saturating_mul(mult)))
.unwrap_or(default)
}
pub fn spawn_poll_loop(
reg: Arc<Registry>,
root: PathBuf,
tick: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(tick);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
let reg = reg.clone();
let root = root.clone();
match tokio::task::spawn_blocking(move || sync_once(®, &root)).await {
Ok(Ok(())) => {}
Ok(Err(e)) => eprintln!("nornir-monitor: sweep error: {e:#}"),
Err(e) => eprintln!("nornir-monitor: sweep task panicked: {e}"),
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::warehouse::dep_graph::{CrossRepoEdge, RepoFacts, WorkspaceGraph};
use std::collections::{BTreeMap, BTreeSet};
fn facts(name: &str, produces: &[&str], consumes: &[&str]) -> RepoFacts {
RepoFacts {
name: name.to_string(),
root: std::path::PathBuf::from("/dev/null"),
produces: produces.iter().map(|s| s.to_string()).collect(),
consumes: consumes.iter().map(|s| s.to_string()).collect(),
}
}
fn edge(from: &str, to: &str, via: &[&str]) -> CrossRepoEdge {
CrossRepoEdge {
from: from.to_string(),
to: to.to_string(),
via: via.iter().map(|s| s.to_string()).collect(),
}
}
fn diamond() -> WorkspaceGraph {
let mut fmap = BTreeMap::new();
for f in [
facts("app", &["app_c"], &["a_c", "b_c"]),
facts("liba", &["a_c"], &["util_c"]),
facts("libb", &["b_c"], &["util_c"]),
facts("util", &["util_c"], &[]),
] {
fmap.insert(f.name.clone(), f);
}
WorkspaceGraph::from_query_parts(
fmap,
vec![
edge("app", "liba", &["a_c"]),
edge("app", "libb", &["b_c"]),
edge("liba", "util", &["util_c"]),
edge("libb", "util", &["util_c"]),
],
)
}
#[test]
fn affected_set_is_changed_union_dependents_and_skips_unaffected() {
let g = diamond();
let all = ["app".to_string(), "liba".to_string(), "libb".to_string(), "util".to_string()];
let present: Vec<&String> = all.iter().collect();
let affected = affected_set(&g, &["liba".to_string()], &present);
let got: BTreeSet<&str> = affected.iter().map(|s| s.as_str()).collect();
assert_eq!(
got,
["liba", "app"].into_iter().collect::<BTreeSet<_>>(),
"affected set must be changed ∪ transitive dependents"
);
assert!(!got.contains("libb"), "sibling libb must be skipped — it does not depend on liba");
assert!(!got.contains("util"), "dependency util must be skipped — liba depends on it, not vice-versa");
let pos = |n: &str| affected.iter().position(|x| x == n).unwrap();
assert!(pos("liba") < pos("app"), "affected set must come back in build order");
let blast = affected_set(&g, &["util".to_string()], &present);
let blast_set: BTreeSet<&str> = blast.iter().map(|s| s.as_str()).collect();
assert_eq!(
blast_set,
["app", "liba", "libb", "util"].into_iter().collect::<BTreeSet<_>>(),
"a change to the shared leaf must rebuild every member"
);
let only_app = vec![&all[0]]; let scoped = affected_set(&g, &["liba".to_string()], &only_app);
assert_eq!(
scoped,
vec!["app".to_string()],
"affected set is intersected with present checkouts — absent `liba` is dropped"
);
}
#[test]
fn populate_scope_is_all_on_full_rebuild_else_just_affected() {
let a = "a".to_string();
let b = "b".to_string();
let c = "c".to_string();
let present = vec![&a, &b, &c];
assert_eq!(members_to_populate(&present, &[]), vec![&a, &b, &c]);
let scope = vec!["b".to_string(), "ghost".to_string()];
assert_eq!(members_to_populate(&present, &scope), vec![&b]);
assert!(members_to_populate(&[], &scope).is_empty());
}
#[test]
fn skip_only_when_scanned_and_inventory_present() {
assert!(!should_skip_member_scan(false, false), "unseen SHA must be scanned");
assert!(should_skip_member_scan(true, true), "scanned SHA with inventory → skip");
assert!(
!should_skip_member_scan(true, false),
"scanned SHA with EMPTY inventory must re-scan to backfill the new table"
);
}
#[test]
fn empty_inventory_at_recorded_sha_backfills_then_skips() {
use crate::knowledge::symbols::{SymbolScan, TestDefRow};
use crate::warehouse::iceberg::IcebergWarehouse;
use crate::warehouse::test_inventory::query_test_inventory;
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let repo = "holger";
let sha = "deadbeefcafef00d";
wh.record_knowledge_scan(repo, sha, uuid::Uuid::new_v4()).unwrap();
let scan_exists = wh.knowledge_scan_exists(repo, sha).unwrap();
let inv0 = query_test_inventory(&wh, repo).unwrap();
assert!(scan_exists, "ledger row must be present (simulating an old scan)");
assert!(inv0.is_empty(), "inventory must start empty (old binary wrote none)");
assert!(
!should_skip_member_scan(scan_exists, !inv0.is_empty()),
"empty inventory at a recorded SHA must NOT skip — it must backfill"
);
let mut scan = SymbolScan {
snapshot_id: uuid::Uuid::new_v4(),
ts: chrono::Utc::now(),
repo: repo.to_string(),
..Default::default()
};
scan.tests.push(TestDefRow {
crate_name: repo.to_string(),
module_path: format!("{repo}::tests"),
test_name: "it_works".into(),
file: "src/lib.rs".into(),
line: 42,
is_heavy: false,
is_async: false,
});
wh.append_symbol_scan(&scan).unwrap();
let inv1 = query_test_inventory(&wh, repo).unwrap();
assert_eq!(inv1.len(), 1, "backfill must write the discovered test row");
assert_eq!(inv1[0].test_name, "it_works");
assert!(
should_skip_member_scan(scan_exists, !inv1.is_empty()),
"once inventory is backfilled, the SHA is skipped again"
);
}
#[test]
fn interval_parsing() {
let d = Duration::from_secs(60);
assert_eq!(parse_interval("", d), Duration::from_secs(60));
assert_eq!(parse_interval("30s", d), Duration::from_secs(30));
assert_eq!(parse_interval("5m", d), Duration::from_secs(300));
assert_eq!(parse_interval("2h", d), Duration::from_secs(7200));
assert_eq!(parse_interval("90", d), Duration::from_secs(90));
assert_eq!(parse_interval("garbage", d), Duration::from_secs(60));
}
}