nornir 0.4.27

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
Documentation
//! Read-back queries over the persisted knowledge map (`symbol_facts`,
//! `call_edges`) in iceberg — the counterpart to [`super::scan_repo`]'s
//! writer. Lets an agent answer callers/callees/defined-in/symbol-lookup
//! over the **pure-Rust (syn) facts**, with no compiled binary required
//! (unlike DWARF `introspect`).
//!
//! Spike scope: always reads the *latest* snapshot for `repo` (max ts).
//! Uses predicate pushdown (`with_filter(repo == …)`) so the planner
//! skips other repos' data files instead of scanning the whole table.

use anyhow::{anyhow, Result};
use arrow::array::{Array, Int32Array, RecordBatch, StringArray, TimestampMicrosecondArray};
use futures::TryStreamExt;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::Catalog;

use super::symbols::{CallEdgeRow, SymbolRow};
use crate::warehouse::iceberg::{IcebergWarehouse, TABLE_CALL_EDGES, TABLE_SYMBOL_FACTS};

/// Latest persisted symbols + calls for `repo`.
pub struct KnowledgeView {
    pub symbols: Vec<SymbolRow>,
    pub calls: Vec<CallEdgeRow>,
}

fn col<'a, T: 'static>(batch: &'a RecordBatch, name: &str) -> Result<&'a T> {
    batch
        .column_by_name(name)
        .ok_or_else(|| anyhow!("missing column `{name}`"))?
        .as_any()
        .downcast_ref::<T>()
        .ok_or_else(|| anyhow!("column `{name}` has unexpected type"))
}

/// Load the latest-snapshot symbols + calls for `repo` from iceberg.
pub fn load_latest(wh: &IcebergWarehouse, repo: &str) -> Result<KnowledgeView> {
    wh.block_on(async {
        // ── symbols ───────────────────────────────────────────────
        let s_table = wh.catalog().load_table(&wh.table_ident(TABLE_SYMBOL_FACTS)).await?;
        let scan = s_table
            .scan()
            .with_filter(Reference::new("repo").equal_to(Datum::string(repo)))
            .build()?;
        let s_batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;

        // Find the most-recent snapshot_id for this repo (max ts_micros).
        let mut latest: Option<(String, i64)> = None;
        for b in &s_batches {
            let snaps = col::<StringArray>(b, "snapshot_id")?;
            let repos = col::<StringArray>(b, "repo")?;
            let ts = col::<TimestampMicrosecondArray>(b, "ts_micros")?;
            for i in 0..b.num_rows() {
                if repos.value(i) != repo {
                    continue;
                }
                let t = ts.value(i);
                if latest.as_ref().map(|(_, lt)| t > *lt).unwrap_or(true) {
                    latest = Some((snaps.value(i).to_string(), t));
                }
            }
        }
        let Some((snap, _)) = latest else {
            return Ok(KnowledgeView { symbols: vec![], calls: vec![] });
        };

        let mut symbols = Vec::new();
        for b in &s_batches {
            let snaps = col::<StringArray>(b, "snapshot_id")?;
            let crate_name = col::<StringArray>(b, "crate_name")?;
            let module_path = col::<StringArray>(b, "module_path")?;
            let item_kind = col::<StringArray>(b, "item_kind")?;
            let item_name = col::<StringArray>(b, "item_name")?;
            let visibility = col::<StringArray>(b, "visibility")?;
            let file = col::<StringArray>(b, "file")?;
            let line = col::<Int32Array>(b, "line")?;
            let doc_lines = col::<Int32Array>(b, "doc_lines")?;
            let signature = col::<StringArray>(b, "signature")?;
            for i in 0..b.num_rows() {
                if snaps.value(i) != snap {
                    continue;
                }
                let sig = signature.value(i);
                symbols.push(SymbolRow {
                    crate_name: crate_name.value(i).to_string(),
                    module_path: module_path.value(i).to_string(),
                    item_kind: item_kind.value(i).to_string(),
                    item_name: item_name.value(i).to_string(),
                    visibility: visibility.value(i).to_string(),
                    file: file.value(i).to_string(),
                    line: line.value(i).max(0) as u32,
                    doc_lines: doc_lines.value(i).max(0) as u32,
                    signature: if sig.is_empty() { None } else { Some(sig.to_string()) },
                });
            }
        }

        // ── calls (same latest snapshot) ──────────────────────────
        let c_table = wh.catalog().load_table(&wh.table_ident(TABLE_CALL_EDGES)).await?;
        let scan = c_table
            .scan()
            .with_filter(Reference::new("repo").equal_to(Datum::string(repo)))
            .build()?;
        let c_batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
        let mut calls = Vec::new();
        for b in &c_batches {
            let snaps = col::<StringArray>(b, "snapshot_id")?;
            let crate_name = col::<StringArray>(b, "crate_name")?;
            let caller = col::<StringArray>(b, "caller_path")?;
            let callee = col::<StringArray>(b, "callee_ident")?;
            let kind = col::<StringArray>(b, "call_kind")?;
            let file = col::<StringArray>(b, "file")?;
            let line = col::<Int32Array>(b, "line")?;
            for i in 0..b.num_rows() {
                if snaps.value(i) != snap {
                    continue;
                }
                calls.push(CallEdgeRow {
                    crate_name: crate_name.value(i).to_string(),
                    caller_path: caller.value(i).to_string(),
                    callee_ident: callee.value(i).to_string(),
                    call_kind: kind.value(i).to_string(),
                    file: file.value(i).to_string(),
                    line: line.value(i).max(0) as u32,
                });
            }
        }

        Ok(KnowledgeView { symbols, calls })
    })
}

impl KnowledgeView {
    /// Symbols whose `item_name` contains `pattern` (case-insensitive).
    pub fn symbol_lookup(&self, pattern: &str, limit: usize) -> Vec<&SymbolRow> {
        let p = pattern.to_lowercase();
        self.symbols
            .iter()
            .filter(|s| s.item_name.to_lowercase().contains(&p))
            .take(limit)
            .collect()
    }

    /// Symbols defined in a file whose path ends with `suffix`.
    pub fn defined_in(&self, suffix: &str) -> Vec<&SymbolRow> {
        self.symbols.iter().filter(|s| s.file.ends_with(suffix)).collect()
    }

    /// Call edges that *invoke* `name`. Matches either an exact `callee_ident`
    /// (bare method calls like `.new()`) or a path-qualified callee whose last
    /// segment is `name` (`Arc::new`, `Foo::new` all match a query of `new`).
    /// The `::` separator is required, so `new` does not match `renew`.
    pub fn callers_of(&self, name: &str) -> Vec<&CallEdgeRow> {
        let suffix = format!("::{name}");
        self.calls
            .iter()
            .filter(|c| c.callee_ident == name || c.callee_ident.ends_with(&suffix))
            .collect()
    }

    /// Call edges *from* a caller whose path ends with `name`.
    pub fn callees_of(&self, name: &str) -> Vec<&CallEdgeRow> {
        self.calls
            .iter()
            .filter(|c| c.caller_path == name || c.caller_path.ends_with(&format!("::{name}")))
            .collect()
    }

    /// Shortest call chain from `from` to `to` over the persisted call edges
    /// (BFS following caller → callee), at **identifier granularity**: each
    /// node is a function's last path segment, so a query of `build`/`new`
    /// matches `Index::build`/`Arc::new`. Returns the sequence of identifiers
    /// from `from` to `to` inclusive, or `None` when unreachable.
    ///
    /// Approximate by construction: the syn facts record callees as
    /// identifiers (`Arc::new` is stored path-qualified, a bare `.new()` is
    /// not), never as fully-resolved defining paths, so distinct functions
    /// that share a name collapse to one node. Use it to surface *a* plausible
    /// call chain (like `dep_path` for repos), not a guaranteed-unique one.
    pub fn call_path(&self, from: &str, to: &str) -> Option<Vec<String>> {
        use std::collections::{BTreeMap, BTreeSet, VecDeque};

        fn last_seg(s: &str) -> &str {
            s.rsplit("::").next().unwrap_or(s)
        }

        let from = last_seg(from).to_string();
        let to = last_seg(to).to_string();

        // adjacency (caller ident -> callee idents) + the set of known nodes.
        let mut adj: BTreeMap<&str, Vec<&str>> = BTreeMap::new();
        let mut nodes: BTreeSet<&str> = BTreeSet::new();
        for e in &self.calls {
            let f = last_seg(&e.caller_path);
            let t = last_seg(&e.callee_ident);
            adj.entry(f).or_default().push(t);
            nodes.insert(f);
            nodes.insert(t);
        }

        if from == to {
            return nodes.contains(from.as_str()).then(|| vec![from]);
        }
        if !nodes.contains(from.as_str()) {
            return None;
        }

        let mut parent: BTreeMap<String, String> = BTreeMap::new();
        let mut seen: BTreeSet<String> = BTreeSet::new();
        let mut queue: VecDeque<String> = VecDeque::new();
        seen.insert(from.clone());
        queue.push_back(from.clone());
        while let Some(cur) = queue.pop_front() {
            let Some(callees) = adj.get(cur.as_str()) else { continue };
            for &c in callees {
                if !seen.insert(c.to_string()) {
                    continue;
                }
                parent.insert(c.to_string(), cur.clone());
                if c == to {
                    let mut path = vec![to.clone()];
                    let mut node = to.clone();
                    while let Some(p) = parent.get(&node) {
                        path.push(p.clone());
                        node = p.clone();
                    }
                    path.reverse();
                    return Some(path);
                }
                queue.push_back(c.to_string());
            }
        }
        None
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::knowledge::symbols::CallEdgeRow;

    fn edge(callee: &str) -> CallEdgeRow {
        CallEdgeRow {
            crate_name: "demo".into(),
            caller_path: "demo::f".into(),
            callee_ident: callee.into(),
            call_kind: "call".into(),
            file: "src/lib.rs".into(),
            line: 1,
        }
    }

    fn edge_from(caller: &str, callee: &str) -> CallEdgeRow {
        CallEdgeRow {
            crate_name: "demo".into(),
            caller_path: caller.into(),
            callee_ident: callee.into(),
            call_kind: "call".into(),
            file: "src/lib.rs".into(),
            line: 1,
        }
    }

    #[test]
    fn callers_of_matches_last_segment_and_bare() {
        let view = KnowledgeView {
            symbols: vec![],
            calls: vec![
                edge("new"),       // bare method call
                edge("Arc::new"),  // path-qualified
                edge("Foo::new"),  // path-qualified
                edge("renew"),     // must NOT match (no `::` boundary)
                edge("Foo::make"), // unrelated
            ],
        };

        let hits: Vec<&str> = view.callers_of("new").iter().map(|c| c.callee_ident.as_str()).collect();
        assert!(hits.contains(&"new"));
        assert!(hits.contains(&"Arc::new"));
        assert!(hits.contains(&"Foo::new"));
        assert!(!hits.contains(&"renew"), "{hits:?}");
        assert!(!hits.contains(&"Foo::make"));
        assert_eq!(hits.len(), 3);

        // A fully-qualified query still matches exactly.
        let exact: Vec<&str> = view.callers_of("Arc::new").iter().map(|c| c.callee_ident.as_str()).collect();
        assert_eq!(exact, vec!["Arc::new"]);
    }

    #[test]
    fn call_path_bfs_over_call_edges() {
        // chain: a::run -> b::step -> c::commit ; plus a detour a::run -> z::noop
        let view = KnowledgeView {
            symbols: vec![],
            calls: vec![
                edge_from("a::run", "step"),
                edge_from("b::step", "Repo::commit"),
                edge_from("a::run", "noop"),
            ],
        };

        // last-segment identity: run -> step -> commit
        let p = view.call_path("run", "commit").expect("path exists");
        assert_eq!(p, vec!["run", "step", "commit"]);

        // fully-qualified inputs are normalised to their last segment.
        let p2 = view.call_path("a::run", "Repo::commit").expect("path exists");
        assert_eq!(p2, vec!["run", "step", "commit"]);

        // self-path when the node exists.
        assert_eq!(view.call_path("step", "step"), Some(vec!["step".to_string()]));

        // unreachable + unknown source.
        assert_eq!(view.call_path("commit", "run"), None);
        assert_eq!(view.call_path("ghost", "run"), None);
    }
}