nornir 0.4.13

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
Documentation
//! Server-monitored self-sync (server-user `nornir`): the poll loop and the
//! per-workspace fetch step.
//!
//! The loop runs inside `nornir-server`: every tick it fetches each *monitored*
//! workspace's git members (pure-Rust HTTPS via [`crate::gitio`]), records sync
//! state in the [`crate::registry`], and — when any member's HEAD moved —
//! invokes the republish hook. [`fetch_workspace`] is shared with the CLI's
//! `nornir workspace fetch`.

use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, bail, Context, Result};

use crate::registry::{Mode, Registry};

/// Outcome of fetching one workspace's members.
#[derive(Debug, Default)]
pub struct FetchReport {
    pub workspace: String,
    /// Members with a remote that were clone/fetched.
    pub fetched: usize,
    /// Members whose HEAD moved since last sync.
    pub changed: Vec<String>,
    /// `(member, error)` for members that failed.
    pub errors: Vec<(String, String)>,
}

impl FetchReport {
    pub fn changed_any(&self) -> bool {
        !self.changed.is_empty()
    }
}

/// Clone/fetch every git member of `name` into `<root>/<name>/git/<member>`,
/// write back each member's sync state, and return what changed. Shared by the
/// CLI verb and the poll loop. HTTPS only (SSH refused; see [`crate::gitio`]).
pub fn fetch_workspace(reg: &Registry, root: &Path, name: &str) -> Result<FetchReport> {
    let mut ws = reg
        .get(name)?
        .ok_or_else(|| anyhow!("no workspace `{name}` in the registry"))?;
    let ws_root = root.join(&ws.name);
    let git_dir = ws_root.join("git");
    std::fs::create_dir_all(&git_dir)?;
    std::fs::create_dir_all(ws_root.join("builds"))?;

    let now = chrono::Utc::now().to_rfc3339();
    let mut report = FetchReport {
        workspace: ws.name.clone(),
        ..Default::default()
    };
    for m in &mut ws.members {
        if m.remote.is_empty() {
            continue; // local/external member — source lives outside
        }
        report.fetched += 1;
        let dest = git_dir.join(&m.name);
        // SSH remotes authenticate with the server's deploy key (resolved inside
        // clone_or_fetch via NORNIR_SSH_DIR / the `nornir` user's home).
        match crate::gitio::clone_or_fetch(&m.remote, &dest, None) {
            Ok(sha) => {
                if sha != m.last_seen_sha {
                    report.changed.push(m.name.clone());
                }
                m.last_seen_sha = sha;
                m.last_synced = now.clone();
                m.sync_state = "ok".into();
            }
            Err(e) => {
                let msg = format!("{e:#}");
                m.sync_state = format!("error: {msg}");
                report.errors.push((m.name.clone(), msg));
            }
        }
    }
    ws.updated_at = now;
    reg.upsert(&ws)?;
    Ok(report)
}

/// One sweep: fetch every monitored workspace; republish those that changed.
/// Per-member errors are isolated (logged, not fatal) so one bad remote never
/// stalls the others or aborts the sweep. Takes the server's shared registry
/// handle (redb is single-writer — the poll loop and the Workspaces RPC must
/// share one `Registry`, never each open their own).
pub fn sync_once(reg: &Registry, root: &Path) -> Result<()> {
    for ws in reg.list()? {
        if ws.mode != Mode::Monitored {
            continue;
        }
        match fetch_workspace(&reg, root, &ws.name) {
            Ok(rep) => {
                for (m, e) in &rep.errors {
                    eprintln!("nornir-monitor: {}::{m} fetch error: {e}", rep.workspace);
                }
                // Republish on a git change OR when never built (empty
                // `current_snapshot` — e.g. the warehouse was wiped) so a clean
                // warehouse rebuilds from git instead of staying empty.
                let needs_build = reg
                    .get(&ws.name)
                    .ok()
                    .flatten()
                    .map(|r| r.current_snapshot.trim().is_empty())
                    .unwrap_or(false);
                if rep.changed_any() || needs_build {
                    let why = if rep.changed_any() {
                        format!("changed [{}]", rep.changed.join(", "))
                    } else {
                        "empty warehouse".to_string()
                    };
                    eprintln!("nornir-monitor: {} {why} → republish", rep.workspace);
                    match republish(reg, root, &ws.name) {
                        Ok(snap) => eprintln!(
                            "nornir-monitor: {} republished → snapshot {snap}",
                            ws.name
                        ),
                        Err(e) => {
                            eprintln!("nornir-monitor: {} republish failed: {e:#}", ws.name)
                        }
                    }
                }
            }
            Err(e) => eprintln!("nornir-monitor: {} fetch failed: {e:#}", ws.name),
        }
    }
    Ok(())
}

/// Rebuild + republish a workspace's warehouse from its freshly-fetched
/// sources: knowledge scan + member-scoped code index + a per-member snapshot,
/// all under `<root>/<ws>/builds/` (the workspace's own Iceberg warehouse).
/// Returns the latest snapshot id; records it on the registry row.
///
/// Operates on the materialized clones under `<root>/<ws>/git/<member>/` — only
/// members with a real checkout (HTTPS today; SSH pack-fetch pending). Mímir
/// `affected_by_change` scoping (rebuild only what moved) is the next refinement;
/// today it rebuilds the workspace index over all members each change.
fn republish(reg: &Registry, root: &Path, ws_name: &str) -> Result<String> {
    use crate::warehouse::iceberg::IcebergWarehouse;

    let ws_root = root.join(ws_name);
    let git_dir = ws_root.join("git");
    let builds = ws_root.join("builds");
    std::fs::create_dir_all(&builds)?;

    let mut record = reg
        .get(ws_name)?
        .ok_or_else(|| anyhow!("workspace `{ws_name}` vanished from the registry"))?;
    let members: Vec<String> = record.members.iter().map(|m| m.name.clone()).collect();

    let index_dir = builds.join("cache").join("index");
    let wh = IcebergWarehouse::open(&builds)
        .with_context(|| format!("open warehouse {}", builds.display()))?;
    let idx = crate::index::Index::open_at(&git_dir, &index_dir)?
        .with_repo_scope(members.clone());

    // Deep-scan is opted into per-workspace in the descriptor ([workspace]
    // deep_scan = true). Best-effort read — a missing/unparsable descriptor
    // simply means no deep-scan.
    let deep_scan = crate::workspace::WorkspaceDescriptor::load(Path::new(&record.descriptor))
        .map(|d| d.workspace.deep_scan)
        .unwrap_or(false);

    let snap = republish_with(&wh, &idx, &git_dir, &index_dir, &members, ws_name, deep_scan)?;

    record.current_snapshot = snap.clone();
    record.updated_at = chrono::Utc::now().to_rfc3339();
    reg.upsert(&record)?;
    Ok(snap)
}

/// Republish a monitored workspace **through caller-owned handles** — the warehouse
/// + Tantivy index the `nornir-server` already holds open (redb is single-writer,
/// so the server must be the sole opener; the poll loop writes through these same
/// handles rather than opening its own). Runs the producer pipeline over the
/// materialized member checkouts under `git_dir` and returns the latest snapshot
/// id. Members without a `.git` checkout (e.g. SSH-only, not yet clonable) are
/// skipped. Errors when none are present.
pub fn republish_with(
    wh: &crate::warehouse::iceberg::IcebergWarehouse,
    idx: &crate::index::Index,
    git_dir: &Path,
    index_dir: &Path,
    members: &[String],
    ws_name: &str,
    deep_scan: bool,
) -> Result<String> {
    let present: Vec<&String> = members
        .iter()
        .filter(|name| git_dir.join(name).join(".git").exists())
        .collect();
    if present.is_empty() {
        bail!(
            "no materialized member checkouts under {} (SSH-only members can't be \
             cloned yet)",
            git_dir.display()
        );
    }

    // 1. Knowledge scan (symbols + git-heat) per member → persist.
    //    **Incremental by git-SHA idempotency** (the "#4 rebuild only what moved"
    //    refinement, done safely without affected_by_change graph-scoping): if a
    //    member's HEAD SHA was already scanned, skip its (expensive) knowledge
    //    scan — unchanged members are a no-op even when the workspace republishes
    //    because another member moved. Also pre-warms the security SBOM cache.
    for name in &present {
        let repo_root = git_dir.join(name);
        let (sha, _branch) = crate::gitio::head_sha_and_branch(&repo_root)
            .unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
        if wh.knowledge_scan_exists(name, &sha).unwrap_or(false) {
            eprintln!("nornir-monitor: {ws_name}/{name} @ {} — knowledge unchanged, skip scan", &sha[..sha.len().min(8)]);
            continue;
        }
        let res = crate::knowledge::scan_all(&repo_root, name)
            .with_context(|| format!("knowledge scan {name}"))?;
        wh.append_symbol_scan(&res.symbols)?;
        wh.append_git_heat_scan(&res.git)?;
        // record the scan (idempotency ledger) + pre-warm security; both best-effort.
        if let Err(e) = wh.record_knowledge_scan(name, &sha, res.symbols.snapshot_id) {
            eprintln!("nornir-monitor: {ws_name}/{name} knowledge-scan ledger skipped: {e:#}");
        }
        if let Err(e) = crate::security::warm(wh, &repo_root, Some(git_dir)) {
            eprintln!("nornir-monitor: {ws_name}/{name} security warm skipped: {e:#}");
        }
    }

    // 2. (Re)build the workspace code index over the member checkouts.
    idx.build().context("build workspace index")?;

    // 3. Snapshot the index into Iceberg, keyed to each member's HEAD.
    let mut last_snapshot = String::new();
    for name in &present {
        let repo_root = git_dir.join(name);
        let (sha, branch) = crate::gitio::head_sha_and_branch(&repo_root)
            .unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
        let snap = crate::index::snapshot::snapshot_to_iceberg(
            wh, ws_name, name, &sha, &branch, index_dir,
        )
        .with_context(|| format!("snapshot {name}"))?;
        last_snapshot = snap.snapshot_id.to_string();
    }

    // 4. Cross-repo dependency graph → persist (so the viz Dep Graph tab reflects
    //    the monitored sources). Cheap + offline (`cargo metadata --no-deps`).
    //    Best-effort: a metadata hiccup must not fail the republish. Note: this
    //    graph is *cross-repo*, so a single-member workspace yields no edges; it
    //    lights up for multi-repo monitored workspaces.
    match build_member_graph(ws_name, git_dir, &present) {
        Ok(graph) => {
            if let Err(e) = wh.record_dep_graph(ws_name, &graph) {
                eprintln!("nornir-monitor: {ws_name} dep-graph persist skipped: {e:#}");
            }
        }
        Err(e) => eprintln!("nornir-monitor: {ws_name} dep-graph build skipped: {e:#}"),
    }

    // 5. Opt-in deep-scan: knowledge-scan the members' entire transitive dep
    //    closure (gatling-fanned) into this same warehouse under repo `deps`.
    //    Best-effort: needs `cargo` on PATH + network for the first fetch.
    if deep_scan {
        let dirs: Vec<std::path::PathBuf> =
            present.iter().map(|n| git_dir.join(n.as_str())).collect();
        match crate::deepscan::deep_scan(&dirs, wh) {
            Ok(rep) => {
                eprintln!(
                    "nornir-monitor: {ws_name} deep-scan: {} dep crate(s) scanned, {} skipped, {} error(s)",
                    rep.crates,
                    rep.skipped,
                    rep.errors.len()
                );
                for e in rep.errors.iter().take(5) {
                    eprintln!("nornir-monitor:   deep-scan error: {e}");
                }
            }
            Err(e) => eprintln!("nornir-monitor: {ws_name} deep-scan skipped: {e:#}"),
        }
    }

    Ok(last_snapshot)
}

/// Build a cross-repo [`WorkspaceGraph`] over the materialized member checkouts
/// by synthesizing a path-based descriptor (so it resolves to `git_dir/<member>`,
/// not the git cache). `cargo metadata --no-deps` → no network.
fn build_member_graph(
    ws_name: &str,
    git_dir: &Path,
    members: &[&String],
) -> Result<crate::warehouse::dep_graph::WorkspaceGraph> {
    use crate::workspace::descriptor::{RepoSpec, WorkspaceDescriptor, WorkspaceMeta};
    let mut repos = std::collections::BTreeMap::new();
    for name in members {
        repos.insert(
            (*name).clone(),
            RepoSpec {
                path: Some(git_dir.join(name).to_string_lossy().into_owned()),
                git: None,
                branch: None,
            },
        );
    }
    let desc = WorkspaceDescriptor {
        workspace: WorkspaceMeta { name: ws_name.to_string(), deep_scan: false },
        repos,
        descriptor_dir: git_dir.to_path_buf(),
    };
    crate::warehouse::dep_graph::WorkspaceGraph::build(&desc)
}

/// Parse `"60s"`, `"5m"`, `"2h"`, or a bare seconds count; fall back to `default`.
pub fn parse_interval(s: &str, default: Duration) -> Duration {
    let s = s.trim();
    if s.is_empty() {
        return default;
    }
    let (num, mult) = match s.chars().last() {
        Some('s') => (&s[..s.len() - 1], 1),
        Some('m') => (&s[..s.len() - 1], 60),
        Some('h') => (&s[..s.len() - 1], 3600),
        _ => (s, 1),
    };
    num.trim()
        .parse::<u64>()
        .ok()
        .map(|n| Duration::from_secs(n.saturating_mul(mult)))
        .unwrap_or(default)
}

/// Spawn the background poll loop over the server's shared registry. Fetches run
/// on a blocking thread so gix I/O never stalls the async runtime. Returns the
/// join handle immediately.
pub fn spawn_poll_loop(
    reg: Arc<Registry>,
    root: PathBuf,
    tick: Duration,
) -> tokio::task::JoinHandle<()> {
    tokio::spawn(async move {
        let mut interval = tokio::time::interval(tick);
        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
        loop {
            interval.tick().await;
            let reg = reg.clone();
            let root = root.clone();
            match tokio::task::spawn_blocking(move || sync_once(&reg, &root)).await {
                Ok(Ok(())) => {}
                Ok(Err(e)) => eprintln!("nornir-monitor: sweep error: {e:#}"),
                Err(e) => eprintln!("nornir-monitor: sweep task panicked: {e}"),
            }
        }
    })
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn interval_parsing() {
        let d = Duration::from_secs(60);
        assert_eq!(parse_interval("", d), Duration::from_secs(60));
        assert_eq!(parse_interval("30s", d), Duration::from_secs(30));
        assert_eq!(parse_interval("5m", d), Duration::from_secs(300));
        assert_eq!(parse_interval("2h", d), Duration::from_secs(7200));
        assert_eq!(parse_interval("90", d), Duration::from_secs(90));
        assert_eq!(parse_interval("garbage", d), Duration::from_secs(60));
    }
}