use anyhow::{anyhow, Result};
use arrow::array::{Array, Int32Array, RecordBatch, StringArray, TimestampMicrosecondArray};
use futures::TryStreamExt;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::Catalog;
use super::symbols::{CallEdgeRow, SymbolRow};
use crate::warehouse::iceberg::{IcebergWarehouse, TABLE_CALL_EDGES, TABLE_SYMBOL_FACTS};
pub struct KnowledgeView {
pub symbols: Vec<SymbolRow>,
pub calls: Vec<CallEdgeRow>,
}
fn col<'a, T: 'static>(batch: &'a RecordBatch, name: &str) -> Result<&'a T> {
batch
.column_by_name(name)
.ok_or_else(|| anyhow!("missing column `{name}`"))?
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| anyhow!("column `{name}` has unexpected type"))
}
pub fn load_latest(wh: &IcebergWarehouse, repo: &str) -> Result<KnowledgeView> {
wh.block_on(async {
let s_table = wh.catalog().load_table(&wh.table_ident(TABLE_SYMBOL_FACTS)).await?;
let scan = s_table
.scan()
.with_filter(Reference::new("repo").equal_to(Datum::string(repo)))
.build()?;
let s_batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
let mut latest: Option<(String, i64)> = None;
for b in &s_batches {
let snaps = col::<StringArray>(b, "snapshot_id")?;
let repos = col::<StringArray>(b, "repo")?;
let ts = col::<TimestampMicrosecondArray>(b, "ts_micros")?;
for i in 0..b.num_rows() {
if repos.value(i) != repo {
continue;
}
let t = ts.value(i);
if latest.as_ref().map(|(_, lt)| t > *lt).unwrap_or(true) {
latest = Some((snaps.value(i).to_string(), t));
}
}
}
let Some((snap, _)) = latest else {
return Ok(KnowledgeView { symbols: vec![], calls: vec![] });
};
let mut symbols = Vec::new();
for b in &s_batches {
let snaps = col::<StringArray>(b, "snapshot_id")?;
let crate_name = col::<StringArray>(b, "crate_name")?;
let module_path = col::<StringArray>(b, "module_path")?;
let item_kind = col::<StringArray>(b, "item_kind")?;
let item_name = col::<StringArray>(b, "item_name")?;
let visibility = col::<StringArray>(b, "visibility")?;
let file = col::<StringArray>(b, "file")?;
let line = col::<Int32Array>(b, "line")?;
let doc_lines = col::<Int32Array>(b, "doc_lines")?;
let signature = col::<StringArray>(b, "signature")?;
for i in 0..b.num_rows() {
if snaps.value(i) != snap {
continue;
}
let sig = signature.value(i);
symbols.push(SymbolRow {
crate_name: crate_name.value(i).to_string(),
module_path: module_path.value(i).to_string(),
item_kind: item_kind.value(i).to_string(),
item_name: item_name.value(i).to_string(),
visibility: visibility.value(i).to_string(),
file: file.value(i).to_string(),
line: line.value(i).max(0) as u32,
doc_lines: doc_lines.value(i).max(0) as u32,
signature: if sig.is_empty() { None } else { Some(sig.to_string()) },
});
}
}
let c_table = wh.catalog().load_table(&wh.table_ident(TABLE_CALL_EDGES)).await?;
let scan = c_table
.scan()
.with_filter(Reference::new("repo").equal_to(Datum::string(repo)))
.build()?;
let c_batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
let mut calls = Vec::new();
for b in &c_batches {
let snaps = col::<StringArray>(b, "snapshot_id")?;
let crate_name = col::<StringArray>(b, "crate_name")?;
let caller = col::<StringArray>(b, "caller_path")?;
let callee = col::<StringArray>(b, "callee_ident")?;
let kind = col::<StringArray>(b, "call_kind")?;
let file = col::<StringArray>(b, "file")?;
let line = col::<Int32Array>(b, "line")?;
for i in 0..b.num_rows() {
if snaps.value(i) != snap {
continue;
}
calls.push(CallEdgeRow {
crate_name: crate_name.value(i).to_string(),
caller_path: caller.value(i).to_string(),
callee_ident: callee.value(i).to_string(),
call_kind: kind.value(i).to_string(),
file: file.value(i).to_string(),
line: line.value(i).max(0) as u32,
});
}
}
Ok(KnowledgeView { symbols, calls })
})
}
impl KnowledgeView {
pub fn symbol_lookup(&self, pattern: &str, limit: usize) -> Vec<&SymbolRow> {
let p = pattern.to_lowercase();
self.symbols
.iter()
.filter(|s| s.item_name.to_lowercase().contains(&p))
.take(limit)
.collect()
}
pub fn defined_in(&self, suffix: &str) -> Vec<&SymbolRow> {
self.symbols.iter().filter(|s| s.file.ends_with(suffix)).collect()
}
pub fn callers_of(&self, name: &str) -> Vec<&CallEdgeRow> {
let suffix = format!("::{name}");
self.calls
.iter()
.filter(|c| c.callee_ident == name || c.callee_ident.ends_with(&suffix))
.collect()
}
pub fn callees_of(&self, name: &str) -> Vec<&CallEdgeRow> {
self.calls
.iter()
.filter(|c| c.caller_path == name || c.caller_path.ends_with(&format!("::{name}")))
.collect()
}
pub fn call_path(&self, from: &str, to: &str) -> Option<Vec<String>> {
use std::collections::{BTreeMap, BTreeSet, VecDeque};
fn last_seg(s: &str) -> &str {
s.rsplit("::").next().unwrap_or(s)
}
let from = last_seg(from).to_string();
let to = last_seg(to).to_string();
let mut adj: BTreeMap<&str, Vec<&str>> = BTreeMap::new();
let mut nodes: BTreeSet<&str> = BTreeSet::new();
for e in &self.calls {
let f = last_seg(&e.caller_path);
let t = last_seg(&e.callee_ident);
adj.entry(f).or_default().push(t);
nodes.insert(f);
nodes.insert(t);
}
if from == to {
return nodes.contains(from.as_str()).then(|| vec![from]);
}
if !nodes.contains(from.as_str()) {
return None;
}
let mut parent: BTreeMap<String, String> = BTreeMap::new();
let mut seen: BTreeSet<String> = BTreeSet::new();
let mut queue: VecDeque<String> = VecDeque::new();
seen.insert(from.clone());
queue.push_back(from.clone());
while let Some(cur) = queue.pop_front() {
let Some(callees) = adj.get(cur.as_str()) else { continue };
for &c in callees {
if !seen.insert(c.to_string()) {
continue;
}
parent.insert(c.to_string(), cur.clone());
if c == to {
let mut path = vec![to.clone()];
let mut node = to.clone();
while let Some(p) = parent.get(&node) {
path.push(p.clone());
node = p.clone();
}
path.reverse();
return Some(path);
}
queue.push_back(c.to_string());
}
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::knowledge::symbols::CallEdgeRow;
fn edge(callee: &str) -> CallEdgeRow {
CallEdgeRow {
crate_name: "demo".into(),
caller_path: "demo::f".into(),
callee_ident: callee.into(),
call_kind: "call".into(),
file: "src/lib.rs".into(),
line: 1,
}
}
fn edge_from(caller: &str, callee: &str) -> CallEdgeRow {
CallEdgeRow {
crate_name: "demo".into(),
caller_path: caller.into(),
callee_ident: callee.into(),
call_kind: "call".into(),
file: "src/lib.rs".into(),
line: 1,
}
}
#[test]
fn callers_of_matches_last_segment_and_bare() {
let view = KnowledgeView {
symbols: vec![],
calls: vec![
edge("new"), edge("Arc::new"), edge("Foo::new"), edge("renew"), edge("Foo::make"), ],
};
let hits: Vec<&str> = view.callers_of("new").iter().map(|c| c.callee_ident.as_str()).collect();
assert!(hits.contains(&"new"));
assert!(hits.contains(&"Arc::new"));
assert!(hits.contains(&"Foo::new"));
assert!(!hits.contains(&"renew"), "{hits:?}");
assert!(!hits.contains(&"Foo::make"));
assert_eq!(hits.len(), 3);
let exact: Vec<&str> = view.callers_of("Arc::new").iter().map(|c| c.callee_ident.as_str()).collect();
assert_eq!(exact, vec!["Arc::new"]);
}
#[test]
fn call_path_bfs_over_call_edges() {
let view = KnowledgeView {
symbols: vec![],
calls: vec![
edge_from("a::run", "step"),
edge_from("b::step", "Repo::commit"),
edge_from("a::run", "noop"),
],
};
let p = view.call_path("run", "commit").expect("path exists");
assert_eq!(p, vec!["run", "step", "commit"]);
let p2 = view.call_path("a::run", "Repo::commit").expect("path exists");
assert_eq!(p2, vec!["run", "step", "commit"]);
assert_eq!(view.call_path("step", "step"), Some(vec!["step".to_string()]));
assert_eq!(view.call_path("commit", "run"), None);
assert_eq!(view.call_path("ghost", "run"), None);
}
}