use std::collections::{BTreeMap, HashSet};
use std::path::{Path, PathBuf};
use anyhow::{anyhow, Context, Result};
use znippy_zoomies::gatling::{self, Config, SlotFill, Split, TypedCodec, TypedSink};
use crate::knowledge::symbols::{self, SymbolScan};
use crate::warehouse::iceberg::IcebergWarehouse;
pub const DEPS_REPO: &str = "deps";
#[derive(Debug, Default)]
pub struct DeepScanReport {
pub crates: usize,
pub skipped: usize,
pub errors: Vec<String>,
}
pub fn resolve_dep_closure(member_dirs: &[PathBuf]) -> Result<Vec<(String, PathBuf)>> {
let mut out: BTreeMap<String, PathBuf> = BTreeMap::new();
for dir in member_dirs {
let manifest = dir.join("Cargo.toml");
if !manifest.exists() {
continue;
}
let _ = std::process::Command::new("cargo")
.args(["fetch", "--manifest-path"])
.arg(&manifest)
.status();
let md = cargo_metadata::MetadataCommand::new()
.manifest_path(&manifest)
.exec()
.with_context(|| format!("cargo metadata for {}", dir.display()))?;
let members: HashSet<_> = md.workspace_members.iter().cloned().collect();
for p in &md.packages {
if members.contains(&p.id) {
continue;
}
let label = format!("{}-{}", p.name, p.version);
let src = p
.manifest_path
.parent()
.map(|d| d.as_std_path().to_path_buf())
.unwrap_or_default();
out.entry(label).or_insert(src);
}
}
Ok(out.into_iter().collect())
}
struct DepCodec;
impl TypedCodec for DepCodec {
type Seg = (usize, usize);
type Output = Result<SymbolScan>;
fn split(&self, data: &[u8], _n_workers: usize, is_last: bool) -> Option<Split<Self::Seg>> {
let mut segments = Vec::new();
let mut start = 0usize;
for (i, &b) in data.iter().enumerate() {
if b == b'\n' {
if i > start {
segments.push((start, i));
}
start = i + 1;
}
}
if is_last && start < data.len() {
segments.push((start, data.len()));
start = data.len();
}
if segments.is_empty() && !is_last {
return None; }
Some(Split { segments, consumed: start })
}
fn transform(&self, data: &[u8], seg: &Self::Seg) -> Self::Output {
let line = std::str::from_utf8(&data[seg.0..seg.1]).context("non-utf8 line")?;
let (label, dir) = line
.split_once('\t')
.ok_or_else(|| anyhow!("malformed deepscan line `{line}`"))?;
symbols::scan_repo(Path::new(dir), DEPS_REPO, uuid::Uuid::new_v4(), chrono::Utc::now())
.with_context(|| format!("syn scan {label}"))
}
}
struct WarehouseSink<'a> {
wh: &'a IcebergWarehouse,
report: DeepScanReport,
}
impl TypedSink<Result<SymbolScan>> for WarehouseSink<'_> {
fn process(&mut self, output: Result<SymbolScan>, _is_last: bool) -> Result<()> {
match output {
Ok(scan) => {
self.wh.append_symbol_scan(&scan)?;
self.report.crates += 1;
}
Err(e) => self.report.errors.push(format!("{e:#}")),
}
Ok(())
}
}
pub fn deep_scan(member_dirs: &[PathBuf], wh: &IcebergWarehouse) -> Result<DeepScanReport> {
let closure = resolve_dep_closure(member_dirs)?;
let mut input = String::new();
let mut skipped = 0usize;
for (label, dir) in &closure {
if dir.is_dir() {
input.push_str(&format!("{label}\t{}\n", dir.display()));
} else {
skipped += 1;
}
}
if input.is_empty() {
return Ok(DeepScanReport { skipped, ..Default::default() });
}
let workers = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
let bytes = input.into_bytes();
let cfg = Config {
chunk_size: bytes.len().max(1 << 16),
carry_headroom: 1 << 16,
ring_slots: 2,
initial_carry: Vec::new(),
slot_fill: SlotFill::Incremental,
};
let mut sink = WarehouseSink { wh, report: DeepScanReport { skipped, ..Default::default() } };
gatling::run_typed(std::io::Cursor::new(bytes), DepCodec, &mut sink, workers, cfg)
.context("gatling deep-scan run")?;
Ok(sink.report)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn split_one_segment_per_complete_line() {
let data = b"a\t/x\nb\t/y\npartial";
let s = DepCodec.split(data, 8, false).expect("two complete lines");
assert_eq!(s.segments, vec![(0, 4), (5, 9)]);
assert_eq!(s.consumed, 10, "partial tail becomes carry");
let s = DepCodec.split(data, 8, true).expect("flush");
assert_eq!(s.segments.len(), 3);
assert_eq!(s.consumed, data.len());
}
#[test]
fn split_waits_for_a_complete_line() {
assert!(DepCodec.split(b"no-newline-yet", 8, false).is_none());
}
#[test]
fn transform_isolates_bad_lines() {
let data = b"missing-tab-separator";
let out = DepCodec.transform(data, &(0, data.len()));
assert!(out.is_err(), "malformed line is an Err output, not a panic");
}
#[test]
fn deep_scan_gatling_end_to_end_into_warehouse() {
let tmp = std::env::temp_dir().join(format!("nornir-deepscan-test-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&tmp);
for (krate, body) in [("alpha", "pub fn alpha_one() {}"), ("beta", "pub fn beta_one() {}")] {
let dir = tmp.join("src_crates").join(krate);
std::fs::create_dir_all(dir.join("src")).unwrap();
std::fs::write(
dir.join("Cargo.toml"),
format!("[package]\nname = \"{krate}\"\nversion = \"0.1.0\"\nedition = \"2021\"\n"),
)
.unwrap();
std::fs::write(dir.join("src/lib.rs"), body).unwrap();
}
let wh_dir = tmp.join("warehouse");
std::fs::create_dir_all(&wh_dir).unwrap();
let wh = IcebergWarehouse::open(&wh_dir).unwrap();
let input = format!(
"alpha-0.1.0\t{}\nbeta-0.1.0\t{}\n",
tmp.join("src_crates/alpha").display(),
tmp.join("src_crates/beta").display()
);
let bytes = input.into_bytes();
let cfg = Config {
chunk_size: bytes.len().max(1 << 16),
carry_headroom: 1 << 16,
ring_slots: 2,
initial_carry: Vec::new(),
slot_fill: SlotFill::Incremental,
};
let mut sink = WarehouseSink { wh: &wh, report: DeepScanReport::default() };
gatling::run_typed(std::io::Cursor::new(bytes), DepCodec, &mut sink, 4, cfg).unwrap();
assert_eq!(sink.report.crates, 2, "both crates scanned: {:?}", sink.report.errors);
assert!(sink.report.errors.is_empty());
let _ = std::fs::remove_dir_all(&tmp);
}
}