nornir 0.4.32

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
//! Iceberg writer + reader for the **fn → warehouse-table access fact**
//! (`warehouse_access_edges`, AUT7 / EPIC ARCH n-002).
//!
//! The PURE extraction model ([`super::access_scan::AccessEdge`], the
//! `query_*`/`append_*` accessor → table map, the `scan_repo`/`scan_source`
//! passes) lives in [`super::access_scan`]. This module is the Iceberg-coupled
//! write/read seam, mirroring the other warehouse fact modules (cf.
//! [`super::test_results`]).
//!
//! Write path: [`append_warehouse_access_edges`] appends one Iceberg snapshot
//! per scan, all rows sharing one `snapshot_id` + `ts_micros`.
//!
//! Read path: [`query_warehouse_access_edges`] scans the table, scopes by
//! [`AccessSelector`], and returns rows sorted by `AccessEdge::key()`.
//!
//! Schema: [`super::iceberg_schema::warehouse_access_edges`] — additive (ships
//! via the standard schema-evolution path; old binaries keep reading).

use std::sync::Arc;

use anyhow::{anyhow, Result};
use arrow::array::{
    Array, Int32Array, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use chrono::Utc;
use futures::TryStreamExt;
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;
use uuid::Uuid;

use super::access_scan::{Access, AccessEdge};
use super::iceberg::{
    append_batch, ensure_table_schema, IcebergWarehouse, TABLE_WAREHOUSE_ACCESS_EDGES,
};

// Column order matches `iceberg_schema::warehouse_access_edges`.
const COL_SNAPSHOT_ID: usize = 0;
const COL_TS_MICROS: usize = 1;
const COL_CALLER_FN: usize = 2;
const COL_CRATE: usize = 3;
const COL_TABLE: usize = 4;
const COL_ACCESS: usize = 5;
const COL_FILE: usize = 6;
const COL_LINE: usize = 7;

/// Scope a read of the access-edge fact table.
#[derive(Debug, Clone)]
pub enum AccessSelector {
    /// Every edge ever recorded.
    All,
    /// Only the edges from one scan snapshot.
    Snapshot(String),
    /// Only edges touching a given warehouse table.
    Table(String),
    /// Only edges from one crate.
    Crate(String),
}

/// Append a batch of access edges as ONE Iceberg snapshot. All rows share the
/// returned `snapshot_id` and a single `ts_micros`. A no-op on an empty slice.
pub async fn append_warehouse_access_edges(
    wh: &IcebergWarehouse,
    edges: &[AccessEdge],
) -> Result<Uuid> {
    let snapshot_id = Uuid::new_v4();
    if edges.is_empty() {
        return Ok(snapshot_id);
    }
    let ts = Utc::now().timestamp_micros();

    let ident = wh.table_ident(TABLE_WAREHOUSE_ACCESS_EDGES);
    let table = wh.catalog().load_table(&ident).await?;
    // Forward-evolve a table created by an older binary (no-op fast path once
    // current) — same safety contract as the other writers.
    let table = ensure_table_schema(
        wh.catalog(),
        &ident,
        table,
        &super::iceberg_schema::warehouse_access_edges()?,
    )
    .await?;
    let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);

    let sid = snapshot_id.to_string();
    let cols: Vec<Arc<dyn Array>> = vec![
        Arc::new(StringArray::from(vec![sid.as_str(); edges.len()])),
        Arc::new(
            TimestampMicrosecondArray::from(vec![ts; edges.len()]).with_timezone("+00:00"),
        ),
        Arc::new(StringArray::from(
            edges.iter().map(|e| e.caller_fn.clone()).collect::<Vec<_>>(),
        )),
        Arc::new(StringArray::from(
            edges.iter().map(|e| e.crate_name.clone()).collect::<Vec<_>>(),
        )),
        Arc::new(StringArray::from(
            edges.iter().map(|e| e.table.clone()).collect::<Vec<_>>(),
        )),
        Arc::new(StringArray::from(
            edges.iter().map(|e| e.access.as_str().to_string()).collect::<Vec<_>>(),
        )),
        Arc::new(StringArray::from(
            edges.iter().map(|e| e.file.clone()).collect::<Vec<_>>(),
        )),
        Arc::new(Int32Array::from(
            edges.iter().map(|e| e.line as i32).collect::<Vec<_>>(),
        )),
    ];
    let batch = RecordBatch::try_new(arrow_schema, cols)?;
    append_batch(wh.catalog(), table, batch).await?;
    Ok(snapshot_id)
}

/// Read access edges, scoped by `sel`, sorted by `AccessEdge::key()`.
pub async fn query_warehouse_access_edges(
    wh: &IcebergWarehouse,
    sel: &AccessSelector,
) -> Result<Vec<AccessEdge>> {
    let table = wh
        .catalog()
        .load_table(&wh.table_ident(TABLE_WAREHOUSE_ACCESS_EDGES))
        .await?;
    let scan = table.scan().build()?;
    let stream = scan.to_arrow().await?;
    let batches: Vec<RecordBatch> = stream.try_collect().await?;

    let mut out: Vec<AccessEdge> = Vec::new();
    for b in &batches {
        let snap = col_str(b, COL_SNAPSHOT_ID)?;
        let caller = col_str(b, COL_CALLER_FN)?;
        let crate_name = col_str(b, COL_CRATE)?;
        let table_c = col_str(b, COL_TABLE)?;
        let access = col_str(b, COL_ACCESS)?;
        let file = col_str(b, COL_FILE)?;
        let line = col_i32(b, COL_LINE)?;
        // ts is present in the schema but not part of the pure edge model.
        let _ = COL_TS_MICROS;

        for i in 0..b.num_rows() {
            let access = match access.value(i) {
                "write" => Access::Write,
                _ => Access::Read,
            };
            let edge = AccessEdge {
                caller_fn: caller.value(i).to_string(),
                crate_name: crate_name.value(i).to_string(),
                table: table_c.value(i).to_string(),
                access,
                file: file.value(i).to_string(),
                line: line.value(i) as u32,
            };
            let keep = match sel {
                AccessSelector::All => true,
                AccessSelector::Snapshot(s) => snap.value(i) == s,
                AccessSelector::Table(t) => &edge.table == t,
                AccessSelector::Crate(c) => &edge.crate_name == c,
            };
            if keep {
                out.push(edge);
            }
        }
    }
    out.sort_by(|a, b| a.key().cmp(&b.key()));
    Ok(out)
}

fn col_str<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a StringArray> {
    b.column(idx)
        .as_any()
        .downcast_ref::<StringArray>()
        .ok_or_else(|| anyhow!("warehouse_access_edges col {idx} is not StringArray"))
}

fn col_i32<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a Int32Array> {
    b.column(idx)
        .as_any()
        .downcast_ref::<Int32Array>()
        .ok_or_else(|| anyhow!("warehouse_access_edges col {idx} is not Int32Array"))
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::warehouse::access_scan::scan_source;

    /// LAW-1 round-trip: extract a known source (a `query_test_results` call in
    /// `foo` + an `append_release_events` in `bar`), assert the extracted edge
    /// set, then append → query back and assert the exact rows.
    #[test]
    fn extract_then_warehouse_round_trip_exact_rows() {
        let src = r#"
            async fn foo(wh: &Wh) {
                let _ = query_test_results(wh, &sel).await;
            }
            async fn bar(wh: &Wh) {
                append_release_events(wh, &events).await.unwrap();
            }
        "#;
        let edges = scan_source("nornir", "src/lib.rs", src);
        // The pure extraction is exactly {foo→test_results:read, bar→release_events:write}.
        assert_eq!(edges.len(), 2);
        assert!(edges
            .iter()
            .any(|e| e.caller_fn == "nornir::foo"
                && e.table == "test_results"
                && e.access == Access::Read));
        assert!(edges
            .iter()
            .any(|e| e.caller_fn == "nornir::bar"
                && e.table == "release_events"
                && e.access == Access::Write));

        let dir = tempfile::tempdir().unwrap();
        let wh = IcebergWarehouse::open(dir.path()).unwrap();
        let snap = wh
            .block_on(append_warehouse_access_edges(&wh, &edges))
            .unwrap();

        // All scope: both rows round-trip byte-for-byte (modulo the snapshot/ts
        // envelope, which the pure model omits).
        let back = wh
            .block_on(query_warehouse_access_edges(&wh, &AccessSelector::All))
            .unwrap();
        assert_eq!(back, edges, "round-trip is exact");

        // Snapshot scope returns this scan's rows.
        let by_snap = wh
            .block_on(query_warehouse_access_edges(
                &wh,
                &AccessSelector::Snapshot(snap.to_string()),
            ))
            .unwrap();
        assert_eq!(by_snap.len(), 2);

        // Table scope prunes to one table.
        let tr = wh
            .block_on(query_warehouse_access_edges(
                &wh,
                &AccessSelector::Table("test_results".into()),
            ))
            .unwrap();
        assert_eq!(tr.len(), 1);
        assert_eq!(tr[0].caller_fn, "nornir::foo");
        assert_eq!(tr[0].access, Access::Read);

        // A bogus snapshot id yields nothing.
        let none = wh
            .block_on(query_warehouse_access_edges(
                &wh,
                &AccessSelector::Snapshot("does-not-exist".into()),
            ))
            .unwrap();
        assert!(none.is_empty());
    }

    #[test]
    fn empty_append_is_noop_and_crate_scope_filters() {
        let dir = tempfile::tempdir().unwrap();
        let wh = IcebergWarehouse::open(dir.path()).unwrap();

        // Appending an empty slice writes no snapshot.
        wh.block_on(append_warehouse_access_edges(&wh, &[])).unwrap();
        let empty = wh
            .block_on(query_warehouse_access_edges(&wh, &AccessSelector::All))
            .unwrap();
        assert!(empty.is_empty());

        let a = scan_source(
            "alpha",
            "src/a.rs",
            "fn r(wh: &Wh) { let _ = query_mcp_stats(wh); }",
        );
        let b = scan_source(
            "beta",
            "src/b.rs",
            "fn w(wh: &Wh) { append_vuln_findings(wh, &f).unwrap(); }",
        );
        wh.block_on(append_warehouse_access_edges(&wh, &a)).unwrap();
        wh.block_on(append_warehouse_access_edges(&wh, &b)).unwrap();

        let alpha = wh
            .block_on(query_warehouse_access_edges(
                &wh,
                &AccessSelector::Crate("alpha".into()),
            ))
            .unwrap();
        assert_eq!(alpha.len(), 1);
        assert_eq!(alpha[0].table, "mcp_requests");
        assert_eq!(alpha[0].access, Access::Read);

        let beta = wh
            .block_on(query_warehouse_access_edges(
                &wh,
                &AccessSelector::Crate("beta".into()),
            ))
            .unwrap();
        assert_eq!(beta.len(), 1);
        assert_eq!(beta[0].table, "vuln_findings");
        assert_eq!(beta[0].access, Access::Write);

        // Cross-crate "all" sees both.
        let all = wh
            .block_on(query_warehouse_access_edges(&wh, &AccessSelector::All))
            .unwrap();
        assert_eq!(all.len(), 2);
    }
}