use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result};
use crate::registry::{Mode, Registry};
#[derive(Debug, Default)]
pub struct FetchReport {
pub workspace: String,
pub fetched: usize,
pub changed: Vec<String>,
pub errors: Vec<(String, String)>,
}
impl FetchReport {
pub fn changed_any(&self) -> bool {
!self.changed.is_empty()
}
}
pub fn fetch_workspace(reg: &Registry, root: &Path, name: &str) -> Result<FetchReport> {
let mut ws = reg
.get(name)?
.ok_or_else(|| anyhow!("no workspace `{name}` in the registry"))?;
let ws_root = root.join(&ws.name);
let git_dir = ws_root.join("git");
std::fs::create_dir_all(&git_dir)?;
std::fs::create_dir_all(ws_root.join("builds"))?;
let now = chrono::Utc::now().to_rfc3339();
let mut report = FetchReport {
workspace: ws.name.clone(),
..Default::default()
};
for m in &mut ws.members {
if m.remote.is_empty() {
continue; }
report.fetched += 1;
let dest = git_dir.join(&m.name);
match crate::gitio::clone_or_fetch(&m.remote, &dest, None) {
Ok(sha) => {
if sha != m.last_seen_sha {
report.changed.push(m.name.clone());
}
m.last_seen_sha = sha;
m.last_synced = now.clone();
m.sync_state = "ok".into();
}
Err(e) => {
let msg = format!("{e:#}");
m.sync_state = format!("error: {msg}");
report.errors.push((m.name.clone(), msg));
}
}
}
ws.updated_at = now;
reg.upsert(&ws)?;
Ok(report)
}
pub fn sync_once(reg: &Registry, root: &Path) -> Result<()> {
for ws in reg.list()? {
if ws.mode != Mode::Monitored {
continue;
}
match fetch_workspace(®, root, &ws.name) {
Ok(rep) => {
for (m, e) in &rep.errors {
eprintln!("nornir-monitor: {}::{m} fetch error: {e}", rep.workspace);
}
let needs_build = reg
.get(&ws.name)
.ok()
.flatten()
.map(|r| r.current_snapshot.trim().is_empty())
.unwrap_or(false);
if rep.changed_any() || needs_build {
let why = if rep.changed_any() {
format!("changed [{}]", rep.changed.join(", "))
} else {
"empty warehouse".to_string()
};
eprintln!("nornir-monitor: {} {why} → republish", rep.workspace);
let changed: Option<&[String]> =
if rep.changed_any() { Some(&rep.changed) } else { None };
match republish(reg, root, &ws.name, changed) {
Ok(snap) => eprintln!(
"nornir-monitor: {} republished → snapshot {snap}",
ws.name
),
Err(e) => {
eprintln!("nornir-monitor: {} republish failed: {e:#}", ws.name)
}
}
}
}
Err(e) => eprintln!("nornir-monitor: {} fetch failed: {e:#}", ws.name),
}
}
Ok(())
}
pub fn republish(
reg: &Registry,
root: &Path,
ws_name: &str,
changed: Option<&[String]>,
) -> Result<String> {
use crate::warehouse::iceberg::IcebergWarehouse;
let ws_root = root.join(ws_name);
let git_dir = ws_root.join("git");
let builds = ws_root.join("builds");
std::fs::create_dir_all(&builds)?;
let mut record = reg
.get(ws_name)?
.ok_or_else(|| anyhow!("workspace `{ws_name}` vanished from the registry"))?;
let members: Vec<String> = record.members.iter().map(|m| m.name.clone()).collect();
let index_dir = builds.join("cache").join("index");
let wh = IcebergWarehouse::open(&builds)
.with_context(|| format!("open warehouse {}", builds.display()))?;
let idx = crate::index::Index::open_at(&git_dir, &index_dir)?
.with_repo_scope(members.clone());
let deep_scan = crate::workspace::WorkspaceDescriptor::load(Path::new(&record.descriptor))
.map(|d| d.workspace.deep_scan)
.unwrap_or(false);
let snap = republish_with(
&wh, &idx, &git_dir, &index_dir, &members, ws_name, deep_scan, changed,
)?;
record.current_snapshot = snap.clone();
record.updated_at = chrono::Utc::now().to_rfc3339();
reg.upsert(&record)?;
Ok(snap)
}
pub fn republish_with(
wh: &crate::warehouse::iceberg::IcebergWarehouse,
idx: &crate::index::Index,
git_dir: &Path,
index_dir: &Path,
members: &[String],
ws_name: &str,
deep_scan: bool,
changed: Option<&[String]>,
) -> Result<String> {
let present: Vec<&String> = members
.iter()
.filter(|name| git_dir.join(name).join(".git").exists())
.collect();
if present.is_empty() {
bail!(
"no materialized member checkouts under {} (SSH-only members can't be \
cloned yet)",
git_dir.display()
);
}
let graph = match build_member_graph(ws_name, git_dir, &present) {
Ok(g) => Some(g),
Err(e) => {
eprintln!("nornir-monitor: {ws_name} dep-graph build skipped: {e:#}");
None
}
};
let scope: Vec<String> = match (changed, graph.as_ref()) {
(Some(ch), Some(g)) if !ch.is_empty() => affected_set(g, ch, &present),
_ => Vec::new(),
};
if scope.is_empty() {
eprintln!("nornir-monitor: {ws_name} → full rebuild over all members");
} else {
eprintln!(
"nornir-monitor: {ws_name} → incremental rebuild, affected [{}]",
scope.join(", ")
);
}
for name in &present {
let repo_root = git_dir.join(name);
let (sha, _branch) = crate::gitio::head_sha_and_branch(&repo_root)
.unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
if wh.knowledge_scan_exists(name, &sha).unwrap_or(false) {
eprintln!("nornir-monitor: {ws_name}/{name} @ {} — knowledge unchanged, skip scan", &sha[..sha.len().min(8)]);
continue;
}
let res = crate::knowledge::scan_all(&repo_root, name)
.with_context(|| format!("knowledge scan {name}"))?;
wh.append_symbol_scan(&res.symbols)?;
wh.append_git_heat_scan(&res.git)?;
if let Err(e) = wh.record_knowledge_scan(name, &sha, res.symbols.snapshot_id) {
eprintln!("nornir-monitor: {ws_name}/{name} knowledge-scan ledger skipped: {e:#}");
}
if let Err(e) = crate::security::warm(wh, &repo_root, Some(git_dir)) {
eprintln!("nornir-monitor: {ws_name}/{name} security warm skipped: {e:#}");
}
}
idx.build_scoped(&scope).context("build workspace index")?;
let mut last_snapshot = String::new();
for name in &present {
let repo_root = git_dir.join(name);
let (sha, branch) = crate::gitio::head_sha_and_branch(&repo_root)
.unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
let snap = crate::index::snapshot::snapshot_to_iceberg(
wh, ws_name, name, &sha, &branch, index_dir,
)
.with_context(|| format!("snapshot {name}"))?;
last_snapshot = snap.snapshot_id.to_string();
}
if let Some(graph) = graph.as_ref() {
if let Err(e) = wh.record_dep_graph(ws_name, graph) {
eprintln!("nornir-monitor: {ws_name} dep-graph persist skipped: {e:#}");
}
}
if deep_scan {
let dirs: Vec<std::path::PathBuf> =
present.iter().map(|n| git_dir.join(n.as_str())).collect();
match crate::deepscan::deep_scan(&dirs, wh) {
Ok(rep) => {
eprintln!(
"nornir-monitor: {ws_name} deep-scan: {} dep crate(s) scanned, {} skipped, {} error(s)",
rep.crates,
rep.skipped,
rep.errors.len()
);
for e in rep.errors.iter().take(5) {
eprintln!("nornir-monitor: deep-scan error: {e}");
}
}
Err(e) => eprintln!("nornir-monitor: {ws_name} deep-scan skipped: {e:#}"),
}
}
Ok(last_snapshot)
}
fn affected_set(
graph: &crate::warehouse::dep_graph::WorkspaceGraph,
changed: &[String],
present: &[&String],
) -> Vec<String> {
let present_set: std::collections::BTreeSet<&str> =
present.iter().map(|s| s.as_str()).collect();
graph
.affected_by_change(changed)
.into_iter()
.filter(|m| present_set.contains(m.as_str()))
.collect()
}
fn build_member_graph(
ws_name: &str,
git_dir: &Path,
members: &[&String],
) -> Result<crate::warehouse::dep_graph::WorkspaceGraph> {
use crate::workspace::descriptor::{RepoSpec, WorkspaceDescriptor, WorkspaceMeta};
let mut repos = std::collections::BTreeMap::new();
for name in members {
repos.insert(
(*name).clone(),
RepoSpec {
path: Some(git_dir.join(name).to_string_lossy().into_owned()),
git: None,
branch: None,
},
);
}
let desc = WorkspaceDescriptor {
workspace: WorkspaceMeta { name: ws_name.to_string(), deep_scan: false },
repos,
descriptor_dir: git_dir.to_path_buf(),
};
crate::warehouse::dep_graph::WorkspaceGraph::build(&desc)
}
pub fn parse_interval(s: &str, default: Duration) -> Duration {
let s = s.trim();
if s.is_empty() {
return default;
}
let (num, mult) = match s.chars().last() {
Some('s') => (&s[..s.len() - 1], 1),
Some('m') => (&s[..s.len() - 1], 60),
Some('h') => (&s[..s.len() - 1], 3600),
_ => (s, 1),
};
num.trim()
.parse::<u64>()
.ok()
.map(|n| Duration::from_secs(n.saturating_mul(mult)))
.unwrap_or(default)
}
pub fn spawn_poll_loop(
reg: Arc<Registry>,
root: PathBuf,
tick: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(tick);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
let reg = reg.clone();
let root = root.clone();
match tokio::task::spawn_blocking(move || sync_once(®, &root)).await {
Ok(Ok(())) => {}
Ok(Err(e)) => eprintln!("nornir-monitor: sweep error: {e:#}"),
Err(e) => eprintln!("nornir-monitor: sweep task panicked: {e}"),
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::warehouse::dep_graph::{CrossRepoEdge, RepoFacts, WorkspaceGraph};
use std::collections::{BTreeMap, BTreeSet};
fn facts(name: &str, produces: &[&str], consumes: &[&str]) -> RepoFacts {
RepoFacts {
name: name.to_string(),
root: std::path::PathBuf::from("/dev/null"),
produces: produces.iter().map(|s| s.to_string()).collect(),
consumes: consumes.iter().map(|s| s.to_string()).collect(),
}
}
fn edge(from: &str, to: &str, via: &[&str]) -> CrossRepoEdge {
CrossRepoEdge {
from: from.to_string(),
to: to.to_string(),
via: via.iter().map(|s| s.to_string()).collect(),
}
}
fn diamond() -> WorkspaceGraph {
let mut fmap = BTreeMap::new();
for f in [
facts("app", &["app_c"], &["a_c", "b_c"]),
facts("liba", &["a_c"], &["util_c"]),
facts("libb", &["b_c"], &["util_c"]),
facts("util", &["util_c"], &[]),
] {
fmap.insert(f.name.clone(), f);
}
WorkspaceGraph::from_query_parts(
fmap,
vec![
edge("app", "liba", &["a_c"]),
edge("app", "libb", &["b_c"]),
edge("liba", "util", &["util_c"]),
edge("libb", "util", &["util_c"]),
],
)
}
#[test]
fn affected_set_is_changed_union_dependents_and_skips_unaffected() {
let g = diamond();
let all = ["app".to_string(), "liba".to_string(), "libb".to_string(), "util".to_string()];
let present: Vec<&String> = all.iter().collect();
let affected = affected_set(&g, &["liba".to_string()], &present);
let got: BTreeSet<&str> = affected.iter().map(|s| s.as_str()).collect();
assert_eq!(
got,
["liba", "app"].into_iter().collect::<BTreeSet<_>>(),
"affected set must be changed ∪ transitive dependents"
);
assert!(!got.contains("libb"), "sibling libb must be skipped — it does not depend on liba");
assert!(!got.contains("util"), "dependency util must be skipped — liba depends on it, not vice-versa");
let pos = |n: &str| affected.iter().position(|x| x == n).unwrap();
assert!(pos("liba") < pos("app"), "affected set must come back in build order");
let blast = affected_set(&g, &["util".to_string()], &present);
let blast_set: BTreeSet<&str> = blast.iter().map(|s| s.as_str()).collect();
assert_eq!(
blast_set,
["app", "liba", "libb", "util"].into_iter().collect::<BTreeSet<_>>(),
"a change to the shared leaf must rebuild every member"
);
let only_app = vec![&all[0]]; let scoped = affected_set(&g, &["liba".to_string()], &only_app);
assert_eq!(
scoped,
vec!["app".to_string()],
"affected set is intersected with present checkouts — absent `liba` is dropped"
);
}
#[test]
fn interval_parsing() {
let d = Duration::from_secs(60);
assert_eq!(parse_interval("", d), Duration::from_secs(60));
assert_eq!(parse_interval("30s", d), Duration::from_secs(30));
assert_eq!(parse_interval("5m", d), Duration::from_secs(300));
assert_eq!(parse_interval("2h", d), Duration::from_secs(7200));
assert_eq!(parse_interval("90", d), Duration::from_secs(90));
assert_eq!(parse_interval("garbage", d), Duration::from_secs(60));
}
}