nornir 0.4.30

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
//! Deep-scan — knowledge-scan a workspace's **entire transitive dependency
//! closure** into the warehouse.
//!
//! `cargo fetch` materializes every dependency's source (deps of deps, the
//! whole tree), `cargo metadata` enumerates the closure, and the per-crate syn
//! symbol scans are fanned across all cores by **znippy's gatling engine**
//! (`znippy_zoomies::gatling::run_typed` — the no-barrier worker-pool streamer):
//! the crate list is streamed as newline-delimited `label\tdir` text, `split`
//! cuts one segment per crate, workers `transform` each into a [`SymbolScan`],
//! and the sink appends them to the warehouse **in stream order** on the
//! collector thread (single writer, no lock war).
//!
//! Rows land under `repo = "deps"`, so the viz 🕸 Call Graph picks them up as
//! repo `deps` with one crate entry per dependency, and the MCP
//! `knowledge_*` tools answer over the full closure.

use std::collections::{BTreeMap, HashSet};
use std::path::{Path, PathBuf};

use anyhow::{anyhow, Context, Result};
use znippy_zoomies::gatling::{self, Config, SlotFill, Split, TypedCodec, TypedSink};

use crate::knowledge::symbols::{self, SymbolScan};
use crate::warehouse::iceberg::IcebergWarehouse;

/// Repo name the dependency-closure rows are filed under.
pub const DEPS_REPO: &str = "deps";

/// Outcome of one deep-scan sweep.
#[derive(Debug, Default)]
pub struct DeepScanReport {
    /// Crates successfully scanned + appended.
    pub crates: usize,
    /// Closure entries whose source dir was absent (fetch failed / non-crates.io).
    pub skipped: usize,
    /// Per-crate scan errors (isolated — one bad crate never stops the sweep).
    pub errors: Vec<String>,
}

/// Resolve the full transitive dependency closure of `member_dirs`:
/// `cargo fetch` (downloads every dep source into CARGO_HOME), then
/// `cargo metadata` for the resolved package set. Returns deduped
/// `(name-version, src_dir)` pairs, excluding the workspace members themselves.
pub fn resolve_dep_closure(member_dirs: &[PathBuf]) -> Result<Vec<(String, PathBuf)>> {
    let mut out: BTreeMap<String, PathBuf> = BTreeMap::new();
    for dir in member_dirs {
        let manifest = dir.join("Cargo.toml");
        if !manifest.exists() {
            continue;
        }
        // Materialize sources first (deps of deps included). Best-effort: scan
        // whatever ends up on disk even if fetch partially failed.
        let _ = std::process::Command::new("cargo")
            .args(["fetch", "--manifest-path"])
            .arg(&manifest)
            .status();
        let md = cargo_metadata::MetadataCommand::new()
            .manifest_path(&manifest)
            .exec()
            .with_context(|| format!("cargo metadata for {}", dir.display()))?;
        let members: HashSet<_> = md.workspace_members.iter().cloned().collect();
        for p in &md.packages {
            if members.contains(&p.id) {
                continue;
            }
            let label = format!("{}-{}", p.name, p.version);
            let src = p
                .manifest_path
                .parent()
                .map(|d| d.as_std_path().to_path_buf())
                .unwrap_or_default();
            out.entry(label).or_insert(src);
        }
    }
    Ok(out.into_iter().collect())
}

/// gatling codec: input is newline-delimited `label\tdir` lines; one segment
/// per complete line; `transform` syn-scans that crate's source dir.
struct DepCodec;

impl TypedCodec for DepCodec {
    /// Byte range of one line within the chunk.
    type Seg = (usize, usize);
    /// One crate's scan (errors carried, not thrown — the sink isolates them).
    type Output = Result<SymbolScan>;

    fn split(&self, data: &[u8], _n_workers: usize, is_last: bool) -> Option<Split<Self::Seg>> {
        let mut segments = Vec::new();
        let mut start = 0usize;
        for (i, &b) in data.iter().enumerate() {
            if b == b'\n' {
                if i > start {
                    segments.push((start, i));
                }
                start = i + 1;
            }
        }
        if is_last && start < data.len() {
            segments.push((start, data.len()));
            start = data.len();
        }
        if segments.is_empty() && !is_last {
            return None; // no complete line yet — grow the carry
        }
        Some(Split { segments, consumed: start })
    }

    fn transform(&self, data: &[u8], seg: &Self::Seg) -> Self::Output {
        let line = std::str::from_utf8(&data[seg.0..seg.1]).context("non-utf8 line")?;
        let (label, dir) = line
            .split_once('\t')
            .ok_or_else(|| anyhow!("malformed deepscan line `{line}`"))?;
        symbols::scan_repo(Path::new(dir), DEPS_REPO, uuid::Uuid::new_v4(), chrono::Utc::now())
            .with_context(|| format!("syn scan {label}"))
    }
}

/// gatling sink: appends each crate's scan through the caller's (single-writer)
/// warehouse handle, in stream order, on the collector thread.
struct WarehouseSink<'a> {
    wh: &'a IcebergWarehouse,
    report: DeepScanReport,
}

impl TypedSink<Result<SymbolScan>> for WarehouseSink<'_> {
    fn process(&mut self, output: Result<SymbolScan>, _is_last: bool) -> Result<()> {
        match output {
            Ok(scan) => {
                self.wh.append_symbol_scan(&scan)?;
                self.report.crates += 1;
            }
            Err(e) => self.report.errors.push(format!("{e:#}")),
        }
        Ok(())
    }
}

/// Deep-scan `member_dirs`' full dependency closure into `wh`. Multicore via
/// gatling; per-crate failures are isolated into the report.
pub fn deep_scan(member_dirs: &[PathBuf], wh: &IcebergWarehouse) -> Result<DeepScanReport> {
    let closure = resolve_dep_closure(member_dirs)?;
    let mut input = String::new();
    let mut skipped = 0usize;
    for (label, dir) in &closure {
        if dir.is_dir() {
            input.push_str(&format!("{label}\t{}\n", dir.display()));
        } else {
            skipped += 1;
        }
    }
    if input.is_empty() {
        return Ok(DeepScanReport { skipped, ..Default::default() });
    }

    let workers = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
    let bytes = input.into_bytes();
    let cfg = Config {
        chunk_size: bytes.len().max(1 << 16),
        carry_headroom: 1 << 16,
        ring_slots: 2,
        initial_carry: Vec::new(),
        slot_fill: SlotFill::Incremental,
    };
    let mut sink = WarehouseSink { wh, report: DeepScanReport { skipped, ..Default::default() } };
    gatling::run_typed(std::io::Cursor::new(bytes), DepCodec, &mut sink, workers, cfg)
        .context("gatling deep-scan run")?;
    Ok(sink.report)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn split_one_segment_per_complete_line() {
        let data = b"a\t/x\nb\t/y\npartial";
        let s = DepCodec.split(data, 8, false).expect("two complete lines");
        assert_eq!(s.segments, vec![(0, 4), (5, 9)]);
        assert_eq!(s.consumed, 10, "partial tail becomes carry");
        // is_last flushes the tail as a final segment.
        let s = DepCodec.split(data, 8, true).expect("flush");
        assert_eq!(s.segments.len(), 3);
        assert_eq!(s.consumed, data.len());
    }

    #[test]
    fn split_waits_for_a_complete_line() {
        assert!(DepCodec.split(b"no-newline-yet", 8, false).is_none());
    }

    #[test]
    fn transform_isolates_bad_lines() {
        let data = b"missing-tab-separator";
        let out = DepCodec.transform(data, &(0, data.len()));
        assert!(out.is_err(), "malformed line is an Err output, not a panic");
    }

    #[test]
    fn deep_scan_gatling_end_to_end_into_warehouse() {
        // Two tiny synthetic "dep" crates scanned through the REAL gatling
        // pipeline into a real temp warehouse.
        let tmp = std::env::temp_dir().join(format!("nornir-deepscan-test-{}", std::process::id()));
        let _ = std::fs::remove_dir_all(&tmp);
        for (krate, body) in [("alpha", "pub fn alpha_one() {}"), ("beta", "pub fn beta_one() {}")] {
            let dir = tmp.join("src_crates").join(krate);
            std::fs::create_dir_all(dir.join("src")).unwrap();
            std::fs::write(
                dir.join("Cargo.toml"),
                format!("[package]\nname = \"{krate}\"\nversion = \"0.1.0\"\nedition = \"2021\"\n"),
            )
            .unwrap();
            std::fs::write(dir.join("src/lib.rs"), body).unwrap();
        }
        let wh_dir = tmp.join("warehouse");
        std::fs::create_dir_all(&wh_dir).unwrap();
        let wh = IcebergWarehouse::open(&wh_dir).unwrap();

        // Drive the gatling pipeline directly over a hand-built closure list.
        let input = format!(
            "alpha-0.1.0\t{}\nbeta-0.1.0\t{}\n",
            tmp.join("src_crates/alpha").display(),
            tmp.join("src_crates/beta").display()
        );
        let bytes = input.into_bytes();
        let cfg = Config {
            chunk_size: bytes.len().max(1 << 16),
            carry_headroom: 1 << 16,
            ring_slots: 2,
            initial_carry: Vec::new(),
            slot_fill: SlotFill::Incremental,
        };
        let mut sink = WarehouseSink { wh: &wh, report: DeepScanReport::default() };
        gatling::run_typed(std::io::Cursor::new(bytes), DepCodec, &mut sink, 4, cfg).unwrap();
        assert_eq!(sink.report.crates, 2, "both crates scanned: {:?}", sink.report.errors);
        assert!(sink.report.errors.is_empty());
        let _ = std::fs::remove_dir_all(&tmp);
    }
}