nornir 0.5.0

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
//! Iceberg writer + reader for the **UI snapshot reference store**
//! (`ui_snapshots`) — the render-oracle's reference images, moved OUT of git
//! into the warehouse as lossless WebP (`.nornir/snapshot-to-warehouse.md`).
//!
//! Write path: [`append_ui_snapshot`] appends ONE reference frame as one Iceberg
//! snapshot, deduplicated on `(repo, crate, test_name, theme, content_sha)` — a
//! frame whose bytes did not change is not re-stored.
//!
//! Read path: [`latest_ui_snapshot`] returns the newest reference for a
//! `(repo, crate, test_name, theme)` key (max `created_at`), or `None`.
//!
//! The bridge to the shared robot kit is [`WarehouseSnapshotStore`], which
//! implements `nornir_robotui::snapshot::SnapshotStore` over this table — but
//! that adapter lives in the test module (robotui is a dev-dependency, so the
//! published `nornir` library never links it). The library half here knows only
//! plain structs + the Iceberg table.
//!
//! Schema: [`super::iceberg_schema::ui_snapshots`] — partitioned by `repo`.

use std::sync::Arc;

use anyhow::{anyhow, Result};
use arrow::array::{
    Array, Int32Array, LargeBinaryArray, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::Catalog;
use futures::TryStreamExt;

use super::iceberg::{append_batch, IcebergWarehouse, TABLE_UI_SNAPSHOTS};

/// One reference frame in the `ui_snapshots` table.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SnapshotRow {
    pub repo: String,
    pub krate: String,
    pub test_name: String,
    pub theme: String,
    /// Lossless WebP bytes (the reference).
    pub webp: Vec<u8>,
    pub width: i32,
    pub height: i32,
    /// sha256 of `webp` (dedup key).
    pub content_sha: String,
    /// Provenance — the git sha the reference was baked at.
    pub git_sha: String,
    /// Wall-clock micros when stored.
    pub created_at_micros: i64,
}

/// Append a reference frame as ONE Iceberg snapshot. Deduplicated on
/// `(repo, crate, test_name, theme, content_sha)`: a byte-identical frame
/// already stored returns `Ok` without appending (so re-runs don't pile blobs).
pub async fn append_ui_snapshot(wh: &IcebergWarehouse, row: &SnapshotRow) -> Result<()> {
    // Dedup: an identical frame for this key is already the latest? Skip.
    if let Some(existing) = latest_ui_snapshot(wh, &row.repo, &row.krate, &row.test_name, &row.theme)
        .await?
    {
        if existing.content_sha == row.content_sha {
            return Ok(());
        }
    }

    let table = wh.catalog().load_table(&wh.table_ident(TABLE_UI_SNAPSHOTS)).await?;
    let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
    let cols: Vec<Arc<dyn Array>> = vec![
        Arc::new(StringArray::from(vec![row.repo.clone()])),
        Arc::new(StringArray::from(vec![row.krate.clone()])),
        Arc::new(StringArray::from(vec![row.test_name.clone()])),
        Arc::new(StringArray::from(vec![row.theme.clone()])),
        Arc::new(LargeBinaryArray::from(vec![row.webp.as_slice()])),
        Arc::new(Int32Array::from(vec![row.width])),
        Arc::new(Int32Array::from(vec![row.height])),
        Arc::new(StringArray::from(vec![row.content_sha.clone()])),
        Arc::new(StringArray::from(vec![row.git_sha.clone()])),
        Arc::new(
            TimestampMicrosecondArray::from(vec![row.created_at_micros]).with_timezone("+00:00"),
        ),
    ];
    let batch = RecordBatch::try_new(schema, cols)?;
    append_batch(wh.catalog(), table, batch).await?;
    Ok(())
}

/// Read the NEWEST reference frame for `(repo, crate, test_name, theme)` (max
/// `created_at`), or `None` when no reference has been stored for that key.
pub async fn latest_ui_snapshot(
    wh: &IcebergWarehouse,
    repo: &str,
    krate: &str,
    test_name: &str,
    theme: &str,
) -> Result<Option<SnapshotRow>> {
    let table = wh.catalog().load_table(&wh.table_ident(TABLE_UI_SNAPSHOTS)).await?;
    // Partition prune to the repo; the rest is a residual in-memory filter
    // (file-level pushdown can over-return, so we re-check every field).
    let scan = table
        .scan()
        .with_filter(Reference::new("repo").equal_to(Datum::string(repo)))
        .select([
            "repo", "crate", "test_name", "theme", "webp", "width", "height", "content_sha",
            "git_sha", "created_at",
        ])
        .build()?;
    let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;

    let mut best: Option<SnapshotRow> = None;
    for b in &batches {
        for r in rows_from_batch(b)? {
            let matches = r.repo == repo
                && r.krate == krate
                && r.test_name == test_name
                && r.theme == theme;
            let newer = best
                .as_ref()
                .map(|x| r.created_at_micros > x.created_at_micros)
                .unwrap_or(true);
            if matches && newer {
                best = Some(r);
            }
        }
    }
    Ok(best)
}

fn col<'a, T: 'static>(batch: &'a RecordBatch, name: &str) -> Result<&'a T> {
    batch
        .column_by_name(name)
        .ok_or_else(|| anyhow!("ui_snapshots batch missing column `{name}`"))?
        .as_any()
        .downcast_ref::<T>()
        .ok_or_else(|| anyhow!("ui_snapshots column `{name}` has unexpected arrow type"))
}

fn rows_from_batch(batch: &RecordBatch) -> Result<Vec<SnapshotRow>> {
    let repo = col::<StringArray>(batch, "repo")?;
    let krate = col::<StringArray>(batch, "crate")?;
    let test_name = col::<StringArray>(batch, "test_name")?;
    let theme = col::<StringArray>(batch, "theme")?;
    let webp = col::<LargeBinaryArray>(batch, "webp")?;
    let width = col::<Int32Array>(batch, "width")?;
    let height = col::<Int32Array>(batch, "height")?;
    let content_sha = col::<StringArray>(batch, "content_sha")?;
    let git_sha = col::<StringArray>(batch, "git_sha")?;
    let created = col::<TimestampMicrosecondArray>(batch, "created_at")?;
    let mut out = Vec::with_capacity(batch.num_rows());
    for i in 0..batch.num_rows() {
        out.push(SnapshotRow {
            repo: repo.value(i).to_string(),
            krate: krate.value(i).to_string(),
            test_name: test_name.value(i).to_string(),
            theme: theme.value(i).to_string(),
            webp: webp.value(i).to_vec(),
            width: width.value(i),
            height: height.value(i),
            content_sha: content_sha.value(i).to_string(),
            git_sha: git_sha.value(i).to_string(),
            created_at_micros: created.value(i),
        });
    }
    Ok(out)
}

/// Sync wrappers for non-async callers (the harness store adapter).
pub fn append_ui_snapshot_sync(wh: &IcebergWarehouse, row: &SnapshotRow) -> Result<()> {
    wh.block_on(append_ui_snapshot(wh, row))
}

pub fn latest_ui_snapshot_sync(
    wh: &IcebergWarehouse,
    repo: &str,
    krate: &str,
    test_name: &str,
    theme: &str,
) -> Result<Option<SnapshotRow>> {
    wh.block_on(latest_ui_snapshot(wh, repo, krate, test_name, theme))
}

#[cfg(test)]
mod tests {
    use super::*;
    use chrono::Utc;
    use image::{Rgba, RgbaImage};
    use nornir_robotui::snapshot::{
        self, assert_snapshot_warehouse, SnapMode, SnapResult, SnapThreshold, SnapshotKey,
        SnapshotRef, SnapshotStore,
    };

    fn wh() -> (tempfile::TempDir, IcebergWarehouse) {
        let dir = tempfile::tempdir().unwrap();
        let wh = IcebergWarehouse::open(dir.path()).unwrap();
        (dir, wh)
    }

    fn gradient(w: u32, h: u32) -> RgbaImage {
        RgbaImage::from_fn(w, h, |x, y| {
            Rgba([(x % 256) as u8, (y % 256) as u8, ((x + y) % 256) as u8, 255])
        })
    }

    fn row(test: &str, webp: Vec<u8>, ts: i64) -> SnapshotRow {
        let sha = {
            use sha2::{Digest, Sha256};
            let d = Sha256::new().chain_update(&webp).finalize();
            d.iter().map(|b| format!("{b:02x}")).collect::<String>()
        };
        SnapshotRow {
            repo: "nornir".into(),
            krate: "nornir-robotui".into(),
            test_name: test.into(),
            theme: "light".into(),
            webp,
            width: 12,
            height: 8,
            content_sha: sha,
            git_sha: "deadbeef".into(),
            created_at_micros: ts,
        }
    }

    /// Plain warehouse round-trip: append a frame, read the newest back EXACT.
    #[test]
    fn append_then_latest_round_trips() {
        let (_d, wh) = wh();
        let r = row("a", vec![1, 2, 3, 4], 1000);
        wh.block_on(append_ui_snapshot(&wh, &r)).unwrap();
        let back = wh
            .block_on(latest_ui_snapshot(&wh, "nornir", "nornir-robotui", "a", "light"))
            .unwrap()
            .expect("stored frame reads back");
        assert_eq!(back, r, "row round-trips byte-exact");

        // A different key has no reference.
        let none = wh
            .block_on(latest_ui_snapshot(&wh, "nornir", "nornir-robotui", "ghost", "light"))
            .unwrap();
        assert!(none.is_none(), "missing key ⇒ None");
    }

    /// `latest` picks the newest `created_at`; identical content dedups.
    #[test]
    fn latest_picks_newest_and_dedups_identical() {
        let (_d, wh) = wh();
        // Two distinct frames for the same key (different bytes → different sha).
        wh.block_on(append_ui_snapshot(&wh, &row("k", vec![0u8; 4], 1000))).unwrap();
        wh.block_on(append_ui_snapshot(&wh, &row("k", vec![9u8; 4], 2000))).unwrap();
        let back = wh
            .block_on(latest_ui_snapshot(&wh, "nornir", "nornir-robotui", "k", "light"))
            .unwrap()
            .unwrap();
        assert_eq!(back.webp, vec![9u8; 4], "newest frame wins");
        assert_eq!(back.created_at_micros, 2000);

        // Re-appending the identical newest frame is a no-op (dedup).
        wh.block_on(append_ui_snapshot(&wh, &row("k", vec![9u8; 4], 3000))).unwrap();
        // Still the ts=2000 row (the ts=3000 dup was skipped).
        let back = wh
            .block_on(latest_ui_snapshot(&wh, "nornir", "nornir-robotui", "k", "light"))
            .unwrap()
            .unwrap();
        assert_eq!(back.created_at_micros, 2000, "identical content was not re-stored");
    }

    /// The warehouse-backed [`SnapshotStore`] adapter the harness drives.
    struct WarehouseSnapshotStore<'a> {
        wh: &'a IcebergWarehouse,
        git_sha: String,
    }
    impl SnapshotStore for WarehouseSnapshotStore<'_> {
        fn fetch_reference(&self, key: &SnapshotKey) -> Result<Option<SnapshotRef>, String> {
            let got = latest_ui_snapshot_sync(self.wh, &key.repo, &key.krate, &key.test_name, &key.theme)
                .map_err(|e| format!("{e:#}"))?;
            Ok(got.map(|r| SnapshotRef {
                webp: r.webp,
                width: r.width as u32,
                height: r.height as u32,
                content_sha: r.content_sha,
            }))
        }
        fn store_reference(&self, key: &SnapshotKey, r: &SnapshotRef) -> Result<(), String> {
            let row = SnapshotRow {
                repo: key.repo.clone(),
                krate: key.krate.clone(),
                test_name: key.test_name.clone(),
                theme: key.theme.clone(),
                webp: r.webp.clone(),
                width: r.width as i32,
                height: r.height as i32,
                content_sha: r.content_sha.clone(),
                git_sha: self.git_sha.clone(),
                created_at_micros: Utc::now().timestamp_micros(),
            };
            append_ui_snapshot_sync(self.wh, &row).map_err(|e| format!("{e:#}"))
        }
    }

    /// THE LAW round-trip (build-plan #5): a KNOWN image goes through
    /// encode → warehouse store → fetch → diff via the real harness entry point
    /// `assert_snapshot_warehouse`, backed by the real `ui_snapshots` table.
    /// Empty/missing reference handling is exercised first (Created), then a
    /// matching frame (Match) and a changed frame (Mismatch).
    #[test]
    fn known_image_round_trips_encode_store_fetch_diff() {
        let (_d, wh) = wh();
        let store = WarehouseSnapshotStore { wh: &wh, git_sha: "abc123".into() };
        let key = SnapshotKey::new("nornir", "nornir-robotui", "demo_panel", "dark");
        let img = gradient(48, 30);
        let thr = SnapThreshold::default();

        // 1) Missing reference → Created (the empty/missing path) + persisted.
        let r = assert_snapshot_warehouse(&key, &img, SnapMode::Check, thr, &store).unwrap();
        assert!(matches!(r, SnapResult::Created { .. }), "first run records the warehouse reference");

        // The reference now lives in the table as a real WebP that decodes back
        // to the SAME pixels (the encode→store→fetch→decode is lossless).
        let stored = latest_ui_snapshot_sync(&wh, "nornir", "nornir-robotui", "demo_panel", "dark")
            .unwrap()
            .expect("warehouse holds the reference");
        assert_eq!((stored.width, stored.height), (48, 30));
        let decoded = snapshot::decode(&stored.webp).unwrap();
        assert_eq!(snapshot::diff_pixels(&img, &decoded, 0), 0, "stored WebP is pixel-exact");

        // 2) Same image → Match (fetched from the warehouse + diffed = 0).
        let r = assert_snapshot_warehouse(&key, &img, SnapMode::Check, thr, &store).unwrap();
        assert!(matches!(r, SnapResult::Match { diff_px: 0 }), "identical frame matches the stored ref");

        // 3) Changed image → Mismatch (the diff trips the threshold).
        let mut changed = img.clone();
        for x in 0..48 {
            changed.put_pixel(x, 15, Rgba([255, 0, 0, 255]));
        }
        let r = assert_snapshot_warehouse(&key, &changed, SnapMode::Check, thr, &store).unwrap();
        match r {
            SnapResult::Mismatch { diff_px, .. } => assert_eq!(diff_px, 48, "the changed row differs"),
            other => panic!("expected Mismatch, got {other:?}"),
        }

        // 4) Update mode re-bakes the reference to the changed frame.
        let r = assert_snapshot_warehouse(&key, &changed, SnapMode::Update, thr, &store).unwrap();
        assert!(matches!(r, SnapResult::Updated { .. }), "update writes a new warehouse reference");
        // The changed frame now matches the (updated) reference.
        let r = assert_snapshot_warehouse(&key, &changed, SnapMode::Check, thr, &store).unwrap();
        assert!(matches!(r, SnapResult::Match { .. }), "updated reference now matches");
    }
}