use anyhow::{anyhow, Result};
use arrow::array::{Array, Int32Array, RecordBatch, StringArray, TimestampMicrosecondArray};
use futures::TryStreamExt;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::Catalog;
use super::symbols::{CallEdgeRow, SymbolRow};
use crate::warehouse::iceberg::{IcebergWarehouse, TABLE_CALL_EDGES, TABLE_SYMBOL_FACTS};
pub struct KnowledgeView {
pub symbols: Vec<SymbolRow>,
pub calls: Vec<CallEdgeRow>,
}
fn col<'a, T: 'static>(batch: &'a RecordBatch, name: &str) -> Result<&'a T> {
batch
.column_by_name(name)
.ok_or_else(|| anyhow!("missing column `{name}`"))?
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| anyhow!("column `{name}` has unexpected type"))
}
pub fn load_latest(wh: &IcebergWarehouse, repo: &str) -> Result<KnowledgeView> {
wh.block_on(async {
let s_table = wh.catalog().load_table(&wh.table_ident(TABLE_SYMBOL_FACTS)).await?;
let scan = s_table
.scan()
.with_filter(Reference::new("repo").equal_to(Datum::string(repo)))
.build()?;
let s_batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
let mut latest: Option<(String, i64)> = None;
for b in &s_batches {
let snaps = col::<StringArray>(b, "snapshot_id")?;
let repos = col::<StringArray>(b, "repo")?;
let ts = col::<TimestampMicrosecondArray>(b, "ts_micros")?;
for i in 0..b.num_rows() {
if repos.value(i) != repo {
continue;
}
let t = ts.value(i);
if latest.as_ref().map(|(_, lt)| t > *lt).unwrap_or(true) {
latest = Some((snaps.value(i).to_string(), t));
}
}
}
let Some((snap, _)) = latest else {
return Ok(KnowledgeView { symbols: vec![], calls: vec![] });
};
let mut symbols = Vec::new();
for b in &s_batches {
let snaps = col::<StringArray>(b, "snapshot_id")?;
let crate_name = col::<StringArray>(b, "crate_name")?;
let module_path = col::<StringArray>(b, "module_path")?;
let item_kind = col::<StringArray>(b, "item_kind")?;
let item_name = col::<StringArray>(b, "item_name")?;
let visibility = col::<StringArray>(b, "visibility")?;
let file = col::<StringArray>(b, "file")?;
let line = col::<Int32Array>(b, "line")?;
let doc_lines = col::<Int32Array>(b, "doc_lines")?;
let signature = col::<StringArray>(b, "signature")?;
for i in 0..b.num_rows() {
if snaps.value(i) != snap {
continue;
}
let sig = signature.value(i);
symbols.push(SymbolRow {
crate_name: crate_name.value(i).to_string(),
module_path: module_path.value(i).to_string(),
item_kind: item_kind.value(i).to_string(),
item_name: item_name.value(i).to_string(),
visibility: visibility.value(i).to_string(),
file: file.value(i).to_string(),
line: line.value(i).max(0) as u32,
doc_lines: doc_lines.value(i).max(0) as u32,
signature: if sig.is_empty() { None } else { Some(sig.to_string()) },
});
}
}
let c_table = wh.catalog().load_table(&wh.table_ident(TABLE_CALL_EDGES)).await?;
let scan = c_table
.scan()
.with_filter(Reference::new("repo").equal_to(Datum::string(repo)))
.build()?;
let c_batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
let mut calls = Vec::new();
for b in &c_batches {
let snaps = col::<StringArray>(b, "snapshot_id")?;
let crate_name = col::<StringArray>(b, "crate_name")?;
let caller = col::<StringArray>(b, "caller_path")?;
let callee = col::<StringArray>(b, "callee_ident")?;
let kind = col::<StringArray>(b, "call_kind")?;
let file = col::<StringArray>(b, "file")?;
let line = col::<Int32Array>(b, "line")?;
for i in 0..b.num_rows() {
if snaps.value(i) != snap {
continue;
}
calls.push(CallEdgeRow {
crate_name: crate_name.value(i).to_string(),
caller_path: caller.value(i).to_string(),
callee_ident: callee.value(i).to_string(),
call_kind: kind.value(i).to_string(),
file: file.value(i).to_string(),
line: line.value(i).max(0) as u32,
});
}
}
Ok(KnowledgeView { symbols, calls })
})
}
#[cfg(feature = "scip")]
pub fn load_latest_scip(
wh: &IcebergWarehouse,
repo: &str,
) -> Result<Option<KnowledgeView>> {
let scan = wh.load_latest_scip(repo)?;
if scan.rows.is_empty() {
return Ok(None);
}
let calls = super::scip::scip_call_edges(&scan);
let symbols = scan.rows.iter().filter(|r| r.is_definition).map(scip_symbol_row).collect();
Ok(Some(KnowledgeView { symbols, calls }))
}
#[cfg(feature = "scip")]
fn scip_symbol_row(r: &super::scip::ScipRow) -> SymbolRow {
SymbolRow {
crate_name: String::new(),
module_path: r.symbol.clone(),
item_kind: r.kind.clone(),
item_name: if r.display_name.is_empty() { r.symbol.clone() } else { r.display_name.clone() },
visibility: String::new(),
file: r.file.clone(),
line: r.start_line,
doc_lines: 0,
signature: None,
}
}
pub fn load_preferred_merged(
wh: &IcebergWarehouse,
members: &[String],
) -> Result<(KnowledgeView, &'static str)> {
let mut symbols: Vec<SymbolRow> = Vec::new();
let mut calls: Vec<CallEdgeRow> = Vec::new();
#[allow(unused_mut)]
let mut any_resolved = false;
#[allow(unused_mut)]
let mut resolved_members: std::collections::HashSet<String> = std::collections::HashSet::new();
#[cfg(feature = "scip")]
{
let mut scans = Vec::new();
for m in members {
let scan = wh.load_latest_scip(m)?;
if !scan.rows.is_empty() {
resolved_members.insert(m.clone());
scans.push(scan);
}
}
if !scans.is_empty() {
any_resolved = true;
let refs: Vec<&super::scip::ScipScan> = scans.iter().collect();
let globals = super::scip::global_symbol_table(&refs);
for scan in &scans {
calls.extend(super::scip::scip_call_edges_with(scan, &globals));
symbols.extend(scan.rows.iter().filter(|r| r.is_definition).map(scip_symbol_row));
}
}
}
let mut any_syn = false;
for m in members {
if resolved_members.contains(m) {
continue;
}
let view = load_latest(wh, m)?;
if !view.symbols.is_empty() || !view.calls.is_empty() {
any_syn = true;
symbols.extend(view.symbols);
calls.extend(view.calls);
}
}
let source = if any_resolved {
"resolved/scip"
} else if any_syn {
"syn"
} else {
""
};
Ok((KnowledgeView { symbols, calls }, source))
}
pub fn load_preferred(wh: &IcebergWarehouse, repo: &str) -> Result<(KnowledgeView, &'static str)> {
#[cfg(feature = "scip")]
{
if let Some(view) = load_latest_scip(wh, repo)? {
return Ok((view, "resolved/scip"));
}
}
Ok((load_latest(wh, repo)?, "syn"))
}
impl KnowledgeView {
pub fn symbol_lookup(&self, pattern: &str, limit: usize) -> Vec<&SymbolRow> {
let p = pattern.to_lowercase();
self.symbols
.iter()
.filter(|s| s.item_name.to_lowercase().contains(&p))
.take(limit)
.collect()
}
pub fn defined_in(&self, suffix: &str) -> Vec<&SymbolRow> {
self.symbols.iter().filter(|s| s.file.ends_with(suffix)).collect()
}
pub fn callers_of(&self, name: &str) -> Vec<&CallEdgeRow> {
let suffix = format!("::{name}");
self.calls
.iter()
.filter(|c| c.callee_ident == name || c.callee_ident.ends_with(&suffix))
.collect()
}
pub fn callees_of(&self, name: &str) -> Vec<&CallEdgeRow> {
self.calls
.iter()
.filter(|c| c.caller_path == name || c.caller_path.ends_with(&format!("::{name}")))
.collect()
}
pub fn call_path(&self, from: &str, to: &str) -> Option<Vec<String>> {
use std::collections::{BTreeMap, BTreeSet, VecDeque};
fn last_seg(s: &str) -> &str {
s.rsplit("::").next().unwrap_or(s)
}
let from = last_seg(from).to_string();
let to = last_seg(to).to_string();
let mut adj: BTreeMap<&str, Vec<&str>> = BTreeMap::new();
let mut nodes: BTreeSet<&str> = BTreeSet::new();
for e in &self.calls {
let f = last_seg(&e.caller_path);
let t = last_seg(&e.callee_ident);
adj.entry(f).or_default().push(t);
nodes.insert(f);
nodes.insert(t);
}
if from == to {
return nodes.contains(from.as_str()).then(|| vec![from]);
}
if !nodes.contains(from.as_str()) {
return None;
}
let mut parent: BTreeMap<String, String> = BTreeMap::new();
let mut seen: BTreeSet<String> = BTreeSet::new();
let mut queue: VecDeque<String> = VecDeque::new();
seen.insert(from.clone());
queue.push_back(from.clone());
while let Some(cur) = queue.pop_front() {
let Some(callees) = adj.get(cur.as_str()) else { continue };
for &c in callees {
if !seen.insert(c.to_string()) {
continue;
}
parent.insert(c.to_string(), cur.clone());
if c == to {
let mut path = vec![to.clone()];
let mut node = to.clone();
while let Some(p) = parent.get(&node) {
path.push(p.clone());
node = p.clone();
}
path.reverse();
return Some(path);
}
queue.push_back(c.to_string());
}
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::knowledge::symbols::CallEdgeRow;
fn edge(callee: &str) -> CallEdgeRow {
CallEdgeRow {
crate_name: "demo".into(),
caller_path: "demo::f".into(),
callee_ident: callee.into(),
call_kind: "call".into(),
file: "src/lib.rs".into(),
line: 1,
}
}
fn edge_from(caller: &str, callee: &str) -> CallEdgeRow {
CallEdgeRow {
crate_name: "demo".into(),
caller_path: caller.into(),
callee_ident: callee.into(),
call_kind: "call".into(),
file: "src/lib.rs".into(),
line: 1,
}
}
#[test]
fn callers_of_matches_last_segment_and_bare() {
let view = KnowledgeView {
symbols: vec![],
calls: vec![
edge("new"), edge("Arc::new"), edge("Foo::new"), edge("renew"), edge("Foo::make"), ],
};
let hits: Vec<&str> = view.callers_of("new").iter().map(|c| c.callee_ident.as_str()).collect();
assert!(hits.contains(&"new"));
assert!(hits.contains(&"Arc::new"));
assert!(hits.contains(&"Foo::new"));
assert!(!hits.contains(&"renew"), "{hits:?}");
assert!(!hits.contains(&"Foo::make"));
assert_eq!(hits.len(), 3);
let exact: Vec<&str> = view.callers_of("Arc::new").iter().map(|c| c.callee_ident.as_str()).collect();
assert_eq!(exact, vec!["Arc::new"]);
}
#[test]
fn call_path_bfs_over_call_edges() {
let view = KnowledgeView {
symbols: vec![],
calls: vec![
edge_from("a::run", "step"),
edge_from("b::step", "Repo::commit"),
edge_from("a::run", "noop"),
],
};
let p = view.call_path("run", "commit").expect("path exists");
assert_eq!(p, vec!["run", "step", "commit"]);
let p2 = view.call_path("a::run", "Repo::commit").expect("path exists");
assert_eq!(p2, vec!["run", "step", "commit"]);
assert_eq!(view.call_path("step", "step"), Some(vec!["step".to_string()]));
assert_eq!(view.call_path("commit", "run"), None);
assert_eq!(view.call_path("ghost", "run"), None);
}
#[cfg(feature = "scip")]
#[test]
fn load_latest_scip_builds_resolved_view() {
use crate::knowledge::scip::{ingest_index, ScipScan};
use crate::warehouse::iceberg::IcebergWarehouse;
use scip::types::{symbol_information, Document, Index, Occurrence, SymbolInformation, SymbolRole};
let mut idx = Index::new();
let mut doc = Document::new();
doc.relative_path = "src/lib.rs".into();
let mut outer_si = SymbolInformation::new();
outer_si.symbol = "rust-analyzer cargo demo 0.1.0 outer().".into();
outer_si.display_name = "outer".into();
outer_si.kind = symbol_information::Kind::Function.into();
doc.symbols.push(outer_si.clone());
let mut outer_def = Occurrence::new();
outer_def.range = vec![10, 3, 10, 8];
outer_def.enclosing_range = vec![10, 0, 20, 1];
outer_def.symbol = outer_si.symbol.clone();
outer_def.symbol_roles = SymbolRole::Definition as i32;
doc.occurrences.push(outer_def);
let mut inner_si = SymbolInformation::new();
inner_si.symbol = "rust-analyzer cargo demo 0.1.0 inner().".into();
inner_si.display_name = "inner".into();
inner_si.kind = symbol_information::Kind::Function.into();
doc.symbols.push(inner_si.clone());
let mut inner_def = Occurrence::new();
inner_def.range = vec![30, 3, 30, 8];
inner_def.enclosing_range = vec![30, 0, 34, 1];
inner_def.symbol = inner_si.symbol.clone();
inner_def.symbol_roles = SymbolRole::Definition as i32;
doc.occurrences.push(inner_def);
let mut ref_inner = Occurrence::new();
ref_inner.range = vec![13, 8, 13, 13];
ref_inner.symbol = inner_si.symbol.clone();
doc.occurrences.push(ref_inner);
idx.documents.push(doc);
let scan: ScipScan = ingest_index(idx, "demo", "deadbeefsha", uuid::Uuid::new_v4(), chrono::Utc::now());
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.append_scip_scan(&scan).unwrap();
let view = load_latest_scip(&wh, "demo").unwrap().expect("scip rows present");
let callers: Vec<&str> = view.callers_of("inner").iter().map(|c| c.caller_path.as_str()).collect();
assert_eq!(callers, vec!["outer"], "resolved caller via containment");
assert_eq!(view.call_path("outer", "inner"), Some(vec!["outer".to_string(), "inner".to_string()]));
assert!(load_latest_scip(&wh, "other").unwrap().is_none());
}
#[cfg(feature = "scip")]
#[test]
fn load_preferred_merged_resolves_cross_binary() {
use crate::knowledge::scip::{ingest_index, ScipScan};
use crate::warehouse::iceberg::IcebergWarehouse;
use scip::types::{symbol_information, Document, Index, Occurrence, SymbolInformation, SymbolRole};
let mut bidx = Index::new();
let mut bdoc = Document::new();
bdoc.relative_path = "src/main.rs".into();
let mut main_si = SymbolInformation::new();
main_si.symbol = "rust-analyzer cargo demo_bin 0.1.0 main().".into();
main_si.display_name = "main".into();
main_si.kind = symbol_information::Kind::Function.into();
bdoc.symbols.push(main_si.clone());
let mut main_def = Occurrence::new();
main_def.range = vec![10, 3, 10, 7];
main_def.enclosing_range = vec![10, 0, 20, 1];
main_def.symbol = main_si.symbol.clone();
main_def.symbol_roles = SymbolRole::Definition as i32;
bdoc.occurrences.push(main_def);
let mut ref_helper = Occurrence::new();
ref_helper.range = vec![13, 8, 13, 14];
ref_helper.symbol = "rust-analyzer cargo demo_lib 0.1.0 helper().".into();
bdoc.occurrences.push(ref_helper);
bidx.documents.push(bdoc);
let bin: ScipScan = ingest_index(bidx, "demo_bin", "binsha", uuid::Uuid::new_v4(), chrono::Utc::now());
let mut lidx = Index::new();
let mut ldoc = Document::new();
ldoc.relative_path = "src/lib.rs".into();
let mut helper_si = SymbolInformation::new();
helper_si.symbol = "rust-analyzer cargo demo_lib 0.1.0 helper().".into();
helper_si.display_name = "helper".into();
helper_si.kind = symbol_information::Kind::Function.into();
ldoc.symbols.push(helper_si.clone());
let mut helper_def = Occurrence::new();
helper_def.range = vec![5, 7, 5, 13];
helper_def.enclosing_range = vec![5, 0, 9, 1];
helper_def.symbol = helper_si.symbol.clone();
helper_def.symbol_roles = SymbolRole::Definition as i32;
ldoc.occurrences.push(helper_def);
lidx.documents.push(ldoc);
let lib: ScipScan = ingest_index(lidx, "demo_lib", "libsha", uuid::Uuid::new_v4(), chrono::Utc::now());
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.append_scip_scan(&bin).unwrap();
wh.append_scip_scan(&lib).unwrap();
let (solo, _src) = load_preferred(&wh, "demo_bin").unwrap();
assert!(
solo.callers_of("helper").is_empty(),
"single-member view must not resolve the cross-binary call: {:?}",
solo.calls
);
let members = vec!["demo_bin".to_string(), "demo_lib".to_string()];
let (merged, source) = load_preferred_merged(&wh, &members).unwrap();
assert_eq!(source, "resolved/scip");
let callers: Vec<&str> =
merged.callers_of("helper").iter().map(|c| c.caller_path.as_str()).collect();
assert_eq!(callers, vec!["main"], "cross-binary edge resolved via moniker join");
assert_eq!(
merged.call_path("main", "helper"),
Some(vec!["main".to_string(), "helper".to_string()]),
);
}
}