use std::sync::Arc;
use anyhow::{anyhow, Result};
use arrow::array::{
Array, Int32Array, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use chrono::Utc;
use futures::TryStreamExt;
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;
use uuid::Uuid;
use super::access_scan::{Access, AccessEdge};
use super::iceberg::{
append_batch, ensure_table_schema, IcebergWarehouse, TABLE_WAREHOUSE_ACCESS_EDGES,
};
const COL_SNAPSHOT_ID: usize = 0;
const COL_TS_MICROS: usize = 1;
const COL_CALLER_FN: usize = 2;
const COL_CRATE: usize = 3;
const COL_TABLE: usize = 4;
const COL_ACCESS: usize = 5;
const COL_FILE: usize = 6;
const COL_LINE: usize = 7;
#[derive(Debug, Clone)]
pub enum AccessSelector {
All,
Snapshot(String),
Table(String),
Crate(String),
}
pub async fn append_warehouse_access_edges(
wh: &IcebergWarehouse,
edges: &[AccessEdge],
) -> Result<Uuid> {
let snapshot_id = Uuid::new_v4();
if edges.is_empty() {
return Ok(snapshot_id);
}
let ts = Utc::now().timestamp_micros();
let ident = wh.table_ident(TABLE_WAREHOUSE_ACCESS_EDGES);
let table = wh.catalog().load_table(&ident).await?;
let table = ensure_table_schema(
wh.catalog(),
&ident,
table,
&super::iceberg_schema::warehouse_access_edges()?,
)
.await?;
let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let sid = snapshot_id.to_string();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![sid.as_str(); edges.len()])),
Arc::new(
TimestampMicrosecondArray::from(vec![ts; edges.len()]).with_timezone("+00:00"),
),
Arc::new(StringArray::from(
edges.iter().map(|e| e.caller_fn.clone()).collect::<Vec<_>>(),
)),
Arc::new(StringArray::from(
edges.iter().map(|e| e.crate_name.clone()).collect::<Vec<_>>(),
)),
Arc::new(StringArray::from(
edges.iter().map(|e| e.table.clone()).collect::<Vec<_>>(),
)),
Arc::new(StringArray::from(
edges.iter().map(|e| e.access.as_str().to_string()).collect::<Vec<_>>(),
)),
Arc::new(StringArray::from(
edges.iter().map(|e| e.file.clone()).collect::<Vec<_>>(),
)),
Arc::new(Int32Array::from(
edges.iter().map(|e| e.line as i32).collect::<Vec<_>>(),
)),
];
let batch = RecordBatch::try_new(arrow_schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(snapshot_id)
}
pub async fn query_warehouse_access_edges(
wh: &IcebergWarehouse,
sel: &AccessSelector,
) -> Result<Vec<AccessEdge>> {
let table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_WAREHOUSE_ACCESS_EDGES))
.await?;
let scan = table.scan().build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
let mut out: Vec<AccessEdge> = Vec::new();
for b in &batches {
let snap = col_str(b, COL_SNAPSHOT_ID)?;
let caller = col_str(b, COL_CALLER_FN)?;
let crate_name = col_str(b, COL_CRATE)?;
let table_c = col_str(b, COL_TABLE)?;
let access = col_str(b, COL_ACCESS)?;
let file = col_str(b, COL_FILE)?;
let line = col_i32(b, COL_LINE)?;
let _ = COL_TS_MICROS;
for i in 0..b.num_rows() {
let access = match access.value(i) {
"write" => Access::Write,
_ => Access::Read,
};
let edge = AccessEdge {
caller_fn: caller.value(i).to_string(),
crate_name: crate_name.value(i).to_string(),
table: table_c.value(i).to_string(),
access,
file: file.value(i).to_string(),
line: line.value(i) as u32,
};
let keep = match sel {
AccessSelector::All => true,
AccessSelector::Snapshot(s) => snap.value(i) == s,
AccessSelector::Table(t) => &edge.table == t,
AccessSelector::Crate(c) => &edge.crate_name == c,
};
if keep {
out.push(edge);
}
}
}
out.sort_by(|a, b| a.key().cmp(&b.key()));
Ok(out)
}
fn col_str<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a StringArray> {
b.column(idx)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("warehouse_access_edges col {idx} is not StringArray"))
}
fn col_i32<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a Int32Array> {
b.column(idx)
.as_any()
.downcast_ref::<Int32Array>()
.ok_or_else(|| anyhow!("warehouse_access_edges col {idx} is not Int32Array"))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::warehouse::access_scan::scan_source;
#[test]
fn extract_then_warehouse_round_trip_exact_rows() {
let src = r#"
async fn foo(wh: &Wh) {
let _ = query_test_results(wh, &sel).await;
}
async fn bar(wh: &Wh) {
append_release_events(wh, &events).await.unwrap();
}
"#;
let edges = scan_source("nornir", "src/lib.rs", src);
assert_eq!(edges.len(), 2);
assert!(edges
.iter()
.any(|e| e.caller_fn == "nornir::foo"
&& e.table == "test_results"
&& e.access == Access::Read));
assert!(edges
.iter()
.any(|e| e.caller_fn == "nornir::bar"
&& e.table == "release_events"
&& e.access == Access::Write));
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let snap = wh
.block_on(append_warehouse_access_edges(&wh, &edges))
.unwrap();
let back = wh
.block_on(query_warehouse_access_edges(&wh, &AccessSelector::All))
.unwrap();
assert_eq!(back, edges, "round-trip is exact");
let by_snap = wh
.block_on(query_warehouse_access_edges(
&wh,
&AccessSelector::Snapshot(snap.to_string()),
))
.unwrap();
assert_eq!(by_snap.len(), 2);
let tr = wh
.block_on(query_warehouse_access_edges(
&wh,
&AccessSelector::Table("test_results".into()),
))
.unwrap();
assert_eq!(tr.len(), 1);
assert_eq!(tr[0].caller_fn, "nornir::foo");
assert_eq!(tr[0].access, Access::Read);
let none = wh
.block_on(query_warehouse_access_edges(
&wh,
&AccessSelector::Snapshot("does-not-exist".into()),
))
.unwrap();
assert!(none.is_empty());
}
#[test]
fn empty_append_is_noop_and_crate_scope_filters() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.block_on(append_warehouse_access_edges(&wh, &[])).unwrap();
let empty = wh
.block_on(query_warehouse_access_edges(&wh, &AccessSelector::All))
.unwrap();
assert!(empty.is_empty());
let a = scan_source(
"alpha",
"src/a.rs",
"fn r(wh: &Wh) { let _ = query_mcp_stats(wh); }",
);
let b = scan_source(
"beta",
"src/b.rs",
"fn w(wh: &Wh) { append_vuln_findings(wh, &f).unwrap(); }",
);
wh.block_on(append_warehouse_access_edges(&wh, &a)).unwrap();
wh.block_on(append_warehouse_access_edges(&wh, &b)).unwrap();
let alpha = wh
.block_on(query_warehouse_access_edges(
&wh,
&AccessSelector::Crate("alpha".into()),
))
.unwrap();
assert_eq!(alpha.len(), 1);
assert_eq!(alpha[0].table, "mcp_requests");
assert_eq!(alpha[0].access, Access::Read);
let beta = wh
.block_on(query_warehouse_access_edges(
&wh,
&AccessSelector::Crate("beta".into()),
))
.unwrap();
assert_eq!(beta.len(), 1);
assert_eq!(beta[0].table, "vuln_findings");
assert_eq!(beta[0].access, Access::Write);
let all = wh
.block_on(query_warehouse_access_edges(&wh, &AccessSelector::All))
.unwrap();
assert_eq!(all.len(), 2);
}
}