use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use anyhow::{anyhow, Context, Result};
use znippy_zoomies::gatling::{self, Config, SlotFill, Split, TypedCodec, TypedSink};
use crate::knowledge::symbols::{self, SymbolScan};
use crate::warehouse::iceberg::IcebergWarehouse;
pub const DEPS_REPO: &str = "deps";
#[derive(Debug, Default)]
pub struct DeepScanReport {
pub crates: usize,
pub skipped: usize,
pub errors: Vec<String>,
}
fn crate_tarball_path(dir: &str) -> Option<PathBuf> {
let dir = dir.trim_end_matches('/');
let cache = dir.replacen("/registry/src/", "/registry/cache/", 1);
if cache == dir {
return None;
}
Some(PathBuf::from(format!("{cache}.crate")))
}
fn cargo_home() -> PathBuf {
if let Some(h) = std::env::var_os("CARGO_HOME") {
return PathBuf::from(h);
}
if let Some(h) = std::env::var_os("HOME") {
return PathBuf::from(h).join(".cargo");
}
PathBuf::from(".cargo")
}
fn registry_index_dirs(cargo_home: &std::path::Path) -> Vec<String> {
let mut names = BTreeMap::new();
for sub in ["cache", "src"] {
if let Ok(rd) = std::fs::read_dir(cargo_home.join("registry").join(sub)) {
for e in rd.flatten() {
if e.path().is_dir() {
if let Some(n) = e.file_name().to_str() {
names.insert(n.to_string(), ());
}
}
}
}
}
names.into_keys().collect()
}
fn find_cargo_lock(dir: &std::path::Path) -> Option<PathBuf> {
let mut cur = Some(dir);
while let Some(d) = cur {
let lock = d.join("Cargo.lock");
if lock.is_file() {
return Some(lock);
}
cur = d.parent();
}
None
}
pub fn resolve_dep_closure(member_dirs: &[PathBuf]) -> Result<Vec<(String, PathBuf)>> {
let cargo_home = cargo_home();
let index_dirs = registry_index_dirs(&cargo_home);
let src_root = cargo_home.join("registry").join("src");
let mut out: BTreeMap<String, PathBuf> = BTreeMap::new();
for dir in member_dirs {
let Some(lock_path) = find_cargo_lock(dir) else { continue };
let Ok(text) = std::fs::read_to_string(&lock_path) else { continue };
let doc: toml::Value = match toml::from_str(&text) {
Ok(d) => d,
Err(_) => continue,
};
let Some(pkgs) = doc.get("package").and_then(|p| p.as_array()) else { continue };
for pkg in pkgs {
if pkg.get("source").and_then(|s| s.as_str()).is_none() {
continue;
}
let (Some(name), Some(version)) = (
pkg.get("name").and_then(|v| v.as_str()),
pkg.get("version").and_then(|v| v.as_str()),
) else {
continue;
};
let label = format!("{name}-{version}");
if out.contains_key(&label) {
continue; }
let dirname = format!("{name}-{version}");
let chosen = index_dirs.iter().find_map(|idx| {
let cand = src_root.join(idx).join(&dirname);
let has_src = cand.is_dir();
let has_crate = crate_tarball_path(&cand.display().to_string())
.map(|p| p.is_file())
.unwrap_or(false);
(has_src || has_crate).then_some(cand)
});
let path = chosen.unwrap_or_else(|| {
let idx = index_dirs.first().map(String::as_str).unwrap_or("index.crates.io");
src_root.join(idx).join(&dirname)
});
out.insert(label, path);
}
}
Ok(out.into_iter().collect())
}
struct CrateFile {
crate_name: String,
tar_path: String,
content: String,
}
fn decompress_crate(label: &str, dir: &str) -> Vec<CrateFile> {
if let Some(tarball) = crate_tarball_path(dir) {
if let Ok(bytes) = std::fs::read(&tarball) {
if let Ok(entries) = lgz::decompress_tar_gz_filter(&bytes, ".rs") {
return entries
.into_iter()
.filter_map(|(path, bytes)| {
String::from_utf8(bytes).ok().map(|content| CrateFile {
crate_name: label.to_string(),
tar_path: path,
content,
})
})
.collect();
}
}
}
let mut out = Vec::new();
for sub in &["src", "tests", "benches", "examples"] {
let subdir = Path::new(dir).join(sub);
if !subdir.is_dir() {
continue;
}
for entry in walkdir::WalkDir::new(&subdir).into_iter().flatten() {
if entry.file_type().is_file()
&& entry.path().extension().and_then(|e| e.to_str()) == Some("rs")
{
if let Ok(content) = std::fs::read_to_string(entry.path()) {
let rel = entry.path().strip_prefix(&subdir).unwrap_or(entry.path());
let tar_path = format!("{label}/{sub}/{}", rel.display());
out.push(CrateFile { crate_name: label.to_string(), tar_path, content });
}
}
}
}
out
}
fn line_split(data: &[u8], is_last: bool) -> Option<Split<(usize, usize)>> {
let mut segments = Vec::new();
let mut start = 0usize;
for (i, &b) in data.iter().enumerate() {
if b == b'\n' {
if i > start {
segments.push((start, i));
}
start = i + 1;
}
}
if is_last && start < data.len() {
segments.push((start, data.len()));
start = data.len();
}
if segments.is_empty() && !is_last {
return None;
}
Some(Split { segments, consumed: start })
}
struct DecompressCodec;
impl TypedCodec for DecompressCodec {
type Seg = (usize, usize);
type Output = Vec<CrateFile>;
fn split(&self, data: &[u8], _n: usize, is_last: bool) -> Option<Split<Self::Seg>> {
line_split(data, is_last)
}
fn transform(&self, data: &[u8], seg: &Self::Seg) -> Self::Output {
let line = std::str::from_utf8(&data[seg.0..seg.1]).unwrap_or("");
match line.split_once('\t') {
Some((label, dir)) => decompress_crate(label, dir),
None => Vec::new(),
}
}
}
struct ParseCodec<'a> {
files: &'a [CrateFile],
snap: uuid::Uuid,
ts: chrono::DateTime<chrono::Utc>,
}
impl TypedCodec for ParseCodec<'_> {
type Seg = (usize, usize);
type Output = Result<SymbolScan>;
fn split(&self, data: &[u8], _n: usize, is_last: bool) -> Option<Split<Self::Seg>> {
line_split(data, is_last)
}
fn transform(&self, data: &[u8], seg: &Self::Seg) -> Self::Output {
let idx: usize = std::str::from_utf8(&data[seg.0..seg.1])
.context("non-utf8 index")?
.trim()
.parse()
.context("bad file index")?;
let f = &self.files[idx];
Ok(symbols::scan_file(
&f.crate_name,
&f.tar_path,
&f.content,
DEPS_REPO,
self.snap,
self.ts,
))
}
}
struct CollectSink {
files: Vec<CrateFile>,
}
impl TypedSink<Vec<CrateFile>> for CollectSink {
fn process(&mut self, out: Vec<CrateFile>, _is_last: bool) -> Result<()> {
self.files.extend(out);
Ok(())
}
}
struct WarehouseSink<'a> {
wh: &'a IcebergWarehouse,
report: DeepScanReport,
pending: Vec<SymbolScan>,
}
impl WarehouseSink<'_> {
fn flush(&mut self) -> Result<()> {
if self.pending.is_empty() {
return Ok(());
}
if std::env::var_os("NORNIR_DEEPSCAN_NOSAVE").is_none() {
self.wh.ingest_symbol_scans(&self.pending)?;
}
self.pending.clear();
Ok(())
}
}
impl TypedSink<Result<SymbolScan>> for WarehouseSink<'_> {
fn process(&mut self, output: Result<SymbolScan>, is_last: bool) -> Result<()> {
match output {
Ok(scan) => {
if !scan.symbols.is_empty() || !scan.calls.is_empty() || !scan.features.is_empty() {
self.pending.push(scan);
}
}
Err(e) => self.report.errors.push(format!("{e:#}")),
}
let _ = is_last;
Ok(())
}
}
pub fn deep_scan(member_dirs: &[PathBuf], wh: &IcebergWarehouse) -> Result<DeepScanReport> {
let t_resolve = std::time::Instant::now();
let closure = resolve_dep_closure(member_dirs)?;
eprintln!(
"nornir-deepscan: ⏱ resolve_dep_closure (Cargo.lock, in-process, no shell) {} crate(s) in {:.1}s",
closure.len(),
t_resolve.elapsed().as_secs_f64()
);
let present: Vec<(String, String)> = closure
.iter()
.filter(|(_, dir)| dir.is_dir())
.map(|(l, dir)| (l.clone(), dir.display().to_string()))
.collect();
let skipped = closure.len() - present.len();
if present.is_empty() {
return Ok(DeepScanReport { skipped, ..Default::default() });
}
let workers = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1);
let snap = uuid::Uuid::new_v4(); let ts = chrono::Utc::now();
let t_scan = std::time::Instant::now();
const BATCH: usize = 1_000_000;
let mut sink = WarehouseSink {
wh,
report: DeepScanReport { skipped, ..Default::default() },
pending: Vec::new(),
};
let (mut d_dec, mut d_parse, mut d_flush) = (
std::time::Duration::ZERO,
std::time::Duration::ZERO,
std::time::Duration::ZERO,
);
let mut n_files = 0usize;
for batch in present.chunks(BATCH) {
let crate_input: String = batch.iter().map(|(l, d)| format!("{l}\t{d}\n")).collect();
let mut collect = CollectSink { files: Vec::new() };
let t = std::time::Instant::now();
gatling::run_typed(
std::io::Cursor::new(crate_input.into_bytes()),
DecompressCodec,
&mut collect,
workers,
scan_cfg(),
)
.context("gatling decompress stage")?;
d_dec += t.elapsed();
let files = collect.files;
if files.is_empty() {
continue;
}
n_files += files.len();
let idx_input: String = (0..files.len()).map(|i| format!("{i}\n")).collect();
let codec = ParseCodec { files: &files, snap, ts };
let t = std::time::Instant::now();
gatling::run_typed(
std::io::Cursor::new(idx_input.into_bytes()),
codec,
&mut sink,
workers,
scan_cfg(),
)
.context("gatling parse stage")?;
d_parse += t.elapsed();
let t = std::time::Instant::now();
sink.flush().context("deep-scan batch bulk append")?;
d_flush += t.elapsed();
}
let t = std::time::Instant::now();
sink.flush().context("deep-scan final bulk append")?;
d_flush += t.elapsed();
let mut report = sink.report;
report.crates = present.len();
eprintln!(
"nornir-deepscan: ⏱ decompress {:.1}s | parse {:.1}s | flush {:.1}s | total {:.1}s ({} files, {} crates)",
d_dec.as_secs_f64(),
d_parse.as_secs_f64(),
d_flush.as_secs_f64(),
t_scan.elapsed().as_secs_f64(),
n_files,
present.len(),
);
Ok(report)
}
fn scan_cfg() -> Config {
Config {
chunk_size: 8 * 1024,
carry_headroom: 1 << 16,
ring_slots: 8,
initial_carry: Vec::new(),
slot_fill: SlotFill::Incremental,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn resolve_dep_closure_reads_cargo_lock_in_process_excludes_path_members() {
let tmp = tempfile::tempdir().unwrap();
let home = tmp.path().join("cargo_home");
let idx = "index.crates.io-1234567890abcdef";
let serde_src = home.join("registry").join("src").join(idx).join("serde-1.0.0");
std::fs::create_dir_all(&serde_src).unwrap();
let member = tmp.path().join("member");
std::fs::create_dir_all(&member).unwrap();
std::fs::write(
member.join("Cargo.lock"),
"[[package]]\nname = \"member\"\nversion = \"0.1.0\"\n\n\
[[package]]\nname = \"serde\"\nversion = \"1.0.0\"\n\
source = \"registry+https://github.com/rust-lang/crates.io-index\"\n",
)
.unwrap();
std::env::set_var("CARGO_HOME", &home);
let closure = resolve_dep_closure(&[member.clone()]).unwrap();
std::env::remove_var("CARGO_HOME");
let labels: Vec<&str> = closure.iter().map(|(l, _)| l.as_str()).collect();
assert!(labels.contains(&"serde-1.0.0"), "registry dep included: {labels:?}");
assert!(
!labels.iter().any(|l| l.starts_with("member")),
"path/workspace member (no `source`) excluded: {labels:?}"
);
let (_, dir) = closure.iter().find(|(l, _)| l == "serde-1.0.0").unwrap();
assert_eq!(dir, &serde_src, "registry dep maps to its CARGO_HOME src dir");
}
#[test]
fn line_split_one_segment_per_complete_line() {
let data = b"a\t/x\nb\t/y\npartial";
let s = line_split(data, false).expect("two complete lines");
assert_eq!(s.segments, vec![(0, 4), (5, 9)]);
assert_eq!(s.consumed, 10, "partial tail becomes carry");
let s = line_split(data, true).expect("flush");
assert_eq!(s.segments.len(), 3);
assert_eq!(s.consumed, data.len());
}
#[test]
fn line_split_waits_for_a_complete_line() {
assert!(line_split(b"no-newline-yet", false).is_none());
}
#[test]
fn decompress_transform_isolates_bad_lines() {
let data = b"missing-tab-separator";
let out = DecompressCodec.transform(data, &(0, data.len()));
assert!(out.is_empty());
}
#[test]
fn decompress_then_scan_file_finds_symbols() {
let tmp = std::env::temp_dir().join(format!("nornir-deepscan-unit-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&tmp);
let dir = tmp.join("alpha");
std::fs::create_dir_all(dir.join("src")).unwrap();
std::fs::write(dir.join("src/lib.rs"), "pub fn alpha_one() {}").unwrap();
let files = decompress_crate("alpha-0.1.0", &dir.display().to_string());
assert_eq!(files.len(), 1, "one .rs file");
assert!(files[0].tar_path.ends_with("src/lib.rs"), "tar path: {}", files[0].tar_path);
let f = &files[0];
let scan = symbols::scan_file(
&f.crate_name, &f.tar_path, &f.content, DEPS_REPO, uuid::Uuid::new_v4(), chrono::Utc::now(),
);
assert!(
scan.symbols.iter().any(|s| s.item_name == "alpha_one"),
"scan_file finds the fn: {:?}",
scan.symbols.iter().map(|s| &s.item_name).collect::<Vec<_>>()
);
let _ = std::fs::remove_dir_all(&tmp);
}
#[test]
fn deep_scan_two_stage_end_to_end_into_warehouse() {
let tmp = std::env::temp_dir().join(format!("nornir-deepscan-e2e-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&tmp);
for (krate, body) in [("alpha", "pub fn alpha_one() {}"), ("beta", "pub fn beta_one() {}")] {
let dir = tmp.join("src_crates").join(krate);
std::fs::create_dir_all(dir.join("src")).unwrap();
std::fs::write(dir.join("src/lib.rs"), body).unwrap();
}
let wh_dir = tmp.join("warehouse");
std::fs::create_dir_all(&wh_dir).unwrap();
let wh = IcebergWarehouse::open(&wh_dir).unwrap();
let crate_input = format!(
"alpha-0.1.0\t{}\nbeta-0.1.0\t{}\n",
tmp.join("src_crates/alpha").display(),
tmp.join("src_crates/beta").display()
);
let mut collect = CollectSink { files: Vec::new() };
gatling::run_typed(
std::io::Cursor::new(crate_input.into_bytes()),
DecompressCodec,
&mut collect,
4,
scan_cfg(),
)
.unwrap();
assert_eq!(collect.files.len(), 2, "both crates' lib.rs decompressed");
let files = collect.files;
let idx_input: String = (0..files.len()).map(|i| format!("{i}\n")).collect();
let codec = ParseCodec { files: &files, snap: uuid::Uuid::new_v4(), ts: chrono::Utc::now() };
let mut sink = WarehouseSink { wh: &wh, report: DeepScanReport::default(), pending: Vec::new() };
gatling::run_typed(std::io::Cursor::new(idx_input.into_bytes()), codec, &mut sink, 4, scan_cfg())
.unwrap();
sink.flush().unwrap();
assert!(sink.report.errors.is_empty(), "no parse errors: {:?}", sink.report.errors);
let _ = std::fs::remove_dir_all(&tmp);
}
}