use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result};
use crate::registry::{Mode, Registry};
#[derive(Debug, Default)]
pub struct FetchReport {
pub workspace: String,
pub fetched: usize,
pub changed: Vec<String>,
pub errors: Vec<(String, String)>,
}
impl FetchReport {
pub fn changed_any(&self) -> bool {
!self.changed.is_empty()
}
}
pub fn fetch_workspace(reg: &Registry, root: &Path, name: &str) -> Result<FetchReport> {
let mut ws = reg
.get(name)?
.ok_or_else(|| anyhow!("no workspace `{name}` in the registry"))?;
let ws_root = root.join(&ws.name);
let git_dir = ws_root.join("git");
std::fs::create_dir_all(&git_dir)?;
std::fs::create_dir_all(ws_root.join("builds"))?;
let now = chrono::Utc::now().to_rfc3339();
let mut report = FetchReport {
workspace: ws.name.clone(),
..Default::default()
};
for m in &mut ws.members {
if m.remote.is_empty() {
continue; }
report.fetched += 1;
let dest = git_dir.join(&m.name);
match crate::gitio::clone_or_fetch(&m.remote, &dest) {
Ok(sha) => {
if sha != m.last_seen_sha {
report.changed.push(m.name.clone());
}
m.last_seen_sha = sha;
m.last_synced = now.clone();
m.sync_state = "ok".into();
}
Err(e) => {
let msg = format!("{e:#}");
m.sync_state = format!("error: {msg}");
report.errors.push((m.name.clone(), msg));
}
}
}
ws.updated_at = now;
reg.upsert(&ws)?;
Ok(report)
}
pub fn sync_once(reg: &Registry, root: &Path) -> Result<()> {
for ws in reg.list()? {
if ws.mode != Mode::Monitored {
continue;
}
match fetch_workspace(®, root, &ws.name) {
Ok(rep) => {
for (m, e) in &rep.errors {
eprintln!("nornir-monitor: {}::{m} fetch error: {e}", rep.workspace);
}
if rep.changed_any() {
eprintln!(
"nornir-monitor: {} changed [{}] → republish",
rep.workspace,
rep.changed.join(", ")
);
match republish(reg, root, &ws.name) {
Ok(snap) => eprintln!(
"nornir-monitor: {} republished → snapshot {snap}",
ws.name
),
Err(e) => {
eprintln!("nornir-monitor: {} republish failed: {e:#}", ws.name)
}
}
}
}
Err(e) => eprintln!("nornir-monitor: {} fetch failed: {e:#}", ws.name),
}
}
Ok(())
}
fn republish(reg: &Registry, root: &Path, ws_name: &str) -> Result<String> {
use crate::warehouse::iceberg::IcebergWarehouse;
let ws_root = root.join(ws_name);
let git_dir = ws_root.join("git");
let builds = ws_root.join("builds");
std::fs::create_dir_all(&builds)?;
let mut record = reg
.get(ws_name)?
.ok_or_else(|| anyhow!("workspace `{ws_name}` vanished from the registry"))?;
let members: Vec<String> = record.members.iter().map(|m| m.name.clone()).collect();
let index_dir = builds.join("cache").join("index");
let wh = IcebergWarehouse::open(&builds)
.with_context(|| format!("open warehouse {}", builds.display()))?;
let idx = crate::index::Index::open_at(&git_dir, &index_dir)?
.with_repo_scope(members.clone());
let snap = republish_with(&wh, &idx, &git_dir, &index_dir, &members, ws_name)?;
record.current_snapshot = snap.clone();
record.updated_at = chrono::Utc::now().to_rfc3339();
reg.upsert(&record)?;
Ok(snap)
}
pub fn republish_with(
wh: &crate::warehouse::iceberg::IcebergWarehouse,
idx: &crate::index::Index,
git_dir: &Path,
index_dir: &Path,
members: &[String],
ws_name: &str,
) -> Result<String> {
let present: Vec<&String> = members
.iter()
.filter(|name| git_dir.join(name).join(".git").exists())
.collect();
if present.is_empty() {
bail!(
"no materialized member checkouts under {} (SSH-only members can't be \
cloned yet)",
git_dir.display()
);
}
for name in &present {
let repo_root = git_dir.join(name);
let res = crate::knowledge::scan_all(&repo_root, name)
.with_context(|| format!("knowledge scan {name}"))?;
wh.append_symbol_scan(&res.symbols)?;
wh.append_git_heat_scan(&res.git)?;
}
idx.build().context("build workspace index")?;
let mut last_snapshot = String::new();
for name in &present {
let repo_root = git_dir.join(name);
let (sha, branch) = crate::gitio::head_sha_and_branch(&repo_root)
.unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
let snap = crate::index::snapshot::snapshot_to_iceberg(
wh, ws_name, name, &sha, &branch, index_dir,
)
.with_context(|| format!("snapshot {name}"))?;
last_snapshot = snap.snapshot_id.to_string();
}
match build_member_graph(ws_name, git_dir, &present) {
Ok(graph) => {
if let Err(e) = wh.record_dep_graph(ws_name, &graph) {
eprintln!("nornir-monitor: {ws_name} dep-graph persist skipped: {e:#}");
}
}
Err(e) => eprintln!("nornir-monitor: {ws_name} dep-graph build skipped: {e:#}"),
}
Ok(last_snapshot)
}
fn build_member_graph(
ws_name: &str,
git_dir: &Path,
members: &[&String],
) -> Result<crate::warehouse::dep_graph::WorkspaceGraph> {
use crate::workspace::descriptor::{RepoSpec, WorkspaceDescriptor, WorkspaceMeta};
let mut repos = std::collections::BTreeMap::new();
for name in members {
repos.insert(
(*name).clone(),
RepoSpec {
path: Some(git_dir.join(name).to_string_lossy().into_owned()),
git: None,
branch: None,
},
);
}
let desc = WorkspaceDescriptor {
workspace: WorkspaceMeta { name: ws_name.to_string() },
repos,
descriptor_dir: git_dir.to_path_buf(),
};
crate::warehouse::dep_graph::WorkspaceGraph::build(&desc)
}
pub fn parse_interval(s: &str, default: Duration) -> Duration {
let s = s.trim();
if s.is_empty() {
return default;
}
let (num, mult) = match s.chars().last() {
Some('s') => (&s[..s.len() - 1], 1),
Some('m') => (&s[..s.len() - 1], 60),
Some('h') => (&s[..s.len() - 1], 3600),
_ => (s, 1),
};
num.trim()
.parse::<u64>()
.ok()
.map(|n| Duration::from_secs(n.saturating_mul(mult)))
.unwrap_or(default)
}
pub fn spawn_poll_loop(
reg: Arc<Registry>,
root: PathBuf,
tick: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(tick);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
let reg = reg.clone();
let root = root.clone();
match tokio::task::spawn_blocking(move || sync_once(®, &root)).await {
Ok(Ok(())) => {}
Ok(Err(e)) => eprintln!("nornir-monitor: sweep error: {e:#}"),
Err(e) => eprintln!("nornir-monitor: sweep task panicked: {e}"),
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn interval_parsing() {
let d = Duration::from_secs(60);
assert_eq!(parse_interval("", d), Duration::from_secs(60));
assert_eq!(parse_interval("30s", d), Duration::from_secs(30));
assert_eq!(parse_interval("5m", d), Duration::from_secs(300));
assert_eq!(parse_interval("2h", d), Duration::from_secs(7200));
assert_eq!(parse_interval("90", d), Duration::from_secs(90));
assert_eq!(parse_interval("garbage", d), Duration::from_secs(60));
}
}