use std::sync::Arc;
use anyhow::{anyhow, Result};
use arrow::array::{
Array, Int32Array, LargeBinaryArray, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::Catalog;
use futures::TryStreamExt;
use super::iceberg::{append_batch, IcebergWarehouse, TABLE_UI_SNAPSHOTS};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SnapshotRow {
pub repo: String,
pub krate: String,
pub test_name: String,
pub theme: String,
pub webp: Vec<u8>,
pub width: i32,
pub height: i32,
pub content_sha: String,
pub git_sha: String,
pub created_at_micros: i64,
}
pub async fn append_ui_snapshot(wh: &IcebergWarehouse, row: &SnapshotRow) -> Result<()> {
if let Some(existing) = latest_ui_snapshot(wh, &row.repo, &row.krate, &row.test_name, &row.theme)
.await?
{
if existing.content_sha == row.content_sha {
return Ok(());
}
}
let table = wh.catalog().load_table(&wh.table_ident(TABLE_UI_SNAPSHOTS)).await?;
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![row.repo.clone()])),
Arc::new(StringArray::from(vec![row.krate.clone()])),
Arc::new(StringArray::from(vec![row.test_name.clone()])),
Arc::new(StringArray::from(vec![row.theme.clone()])),
Arc::new(LargeBinaryArray::from(vec![row.webp.as_slice()])),
Arc::new(Int32Array::from(vec![row.width])),
Arc::new(Int32Array::from(vec![row.height])),
Arc::new(StringArray::from(vec![row.content_sha.clone()])),
Arc::new(StringArray::from(vec![row.git_sha.clone()])),
Arc::new(
TimestampMicrosecondArray::from(vec![row.created_at_micros]).with_timezone("+00:00"),
),
];
let batch = RecordBatch::try_new(schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(())
}
pub async fn latest_ui_snapshot(
wh: &IcebergWarehouse,
repo: &str,
krate: &str,
test_name: &str,
theme: &str,
) -> Result<Option<SnapshotRow>> {
let table = wh.catalog().load_table(&wh.table_ident(TABLE_UI_SNAPSHOTS)).await?;
let scan = table
.scan()
.with_filter(Reference::new("repo").equal_to(Datum::string(repo)))
.select([
"repo", "crate", "test_name", "theme", "webp", "width", "height", "content_sha",
"git_sha", "created_at",
])
.build()?;
let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
let mut best: Option<SnapshotRow> = None;
for b in &batches {
for r in rows_from_batch(b)? {
let matches = r.repo == repo
&& r.krate == krate
&& r.test_name == test_name
&& r.theme == theme;
let newer = best
.as_ref()
.map(|x| r.created_at_micros > x.created_at_micros)
.unwrap_or(true);
if matches && newer {
best = Some(r);
}
}
}
Ok(best)
}
fn col<'a, T: 'static>(batch: &'a RecordBatch, name: &str) -> Result<&'a T> {
batch
.column_by_name(name)
.ok_or_else(|| anyhow!("ui_snapshots batch missing column `{name}`"))?
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| anyhow!("ui_snapshots column `{name}` has unexpected arrow type"))
}
fn rows_from_batch(batch: &RecordBatch) -> Result<Vec<SnapshotRow>> {
let repo = col::<StringArray>(batch, "repo")?;
let krate = col::<StringArray>(batch, "crate")?;
let test_name = col::<StringArray>(batch, "test_name")?;
let theme = col::<StringArray>(batch, "theme")?;
let webp = col::<LargeBinaryArray>(batch, "webp")?;
let width = col::<Int32Array>(batch, "width")?;
let height = col::<Int32Array>(batch, "height")?;
let content_sha = col::<StringArray>(batch, "content_sha")?;
let git_sha = col::<StringArray>(batch, "git_sha")?;
let created = col::<TimestampMicrosecondArray>(batch, "created_at")?;
let mut out = Vec::with_capacity(batch.num_rows());
for i in 0..batch.num_rows() {
out.push(SnapshotRow {
repo: repo.value(i).to_string(),
krate: krate.value(i).to_string(),
test_name: test_name.value(i).to_string(),
theme: theme.value(i).to_string(),
webp: webp.value(i).to_vec(),
width: width.value(i),
height: height.value(i),
content_sha: content_sha.value(i).to_string(),
git_sha: git_sha.value(i).to_string(),
created_at_micros: created.value(i),
});
}
Ok(out)
}
pub fn append_ui_snapshot_sync(wh: &IcebergWarehouse, row: &SnapshotRow) -> Result<()> {
wh.block_on(append_ui_snapshot(wh, row))
}
pub fn latest_ui_snapshot_sync(
wh: &IcebergWarehouse,
repo: &str,
krate: &str,
test_name: &str,
theme: &str,
) -> Result<Option<SnapshotRow>> {
wh.block_on(latest_ui_snapshot(wh, repo, krate, test_name, theme))
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use image::{Rgba, RgbaImage};
use nornir_robotui::snapshot::{
self, assert_snapshot_warehouse, SnapMode, SnapResult, SnapThreshold, SnapshotKey,
SnapshotRef, SnapshotStore,
};
fn wh() -> (tempfile::TempDir, IcebergWarehouse) {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
(dir, wh)
}
fn gradient(w: u32, h: u32) -> RgbaImage {
RgbaImage::from_fn(w, h, |x, y| {
Rgba([(x % 256) as u8, (y % 256) as u8, ((x + y) % 256) as u8, 255])
})
}
fn row(test: &str, webp: Vec<u8>, ts: i64) -> SnapshotRow {
let sha = {
use sha2::{Digest, Sha256};
let d = Sha256::new().chain_update(&webp).finalize();
d.iter().map(|b| format!("{b:02x}")).collect::<String>()
};
SnapshotRow {
repo: "nornir".into(),
krate: "nornir-robotui".into(),
test_name: test.into(),
theme: "light".into(),
webp,
width: 12,
height: 8,
content_sha: sha,
git_sha: "deadbeef".into(),
created_at_micros: ts,
}
}
#[test]
fn append_then_latest_round_trips() {
let (_d, wh) = wh();
let r = row("a", vec![1, 2, 3, 4], 1000);
wh.block_on(append_ui_snapshot(&wh, &r)).unwrap();
let back = wh
.block_on(latest_ui_snapshot(&wh, "nornir", "nornir-robotui", "a", "light"))
.unwrap()
.expect("stored frame reads back");
assert_eq!(back, r, "row round-trips byte-exact");
let none = wh
.block_on(latest_ui_snapshot(&wh, "nornir", "nornir-robotui", "ghost", "light"))
.unwrap();
assert!(none.is_none(), "missing key ⇒ None");
}
#[test]
fn latest_picks_newest_and_dedups_identical() {
let (_d, wh) = wh();
wh.block_on(append_ui_snapshot(&wh, &row("k", vec![0u8; 4], 1000))).unwrap();
wh.block_on(append_ui_snapshot(&wh, &row("k", vec![9u8; 4], 2000))).unwrap();
let back = wh
.block_on(latest_ui_snapshot(&wh, "nornir", "nornir-robotui", "k", "light"))
.unwrap()
.unwrap();
assert_eq!(back.webp, vec![9u8; 4], "newest frame wins");
assert_eq!(back.created_at_micros, 2000);
wh.block_on(append_ui_snapshot(&wh, &row("k", vec![9u8; 4], 3000))).unwrap();
let back = wh
.block_on(latest_ui_snapshot(&wh, "nornir", "nornir-robotui", "k", "light"))
.unwrap()
.unwrap();
assert_eq!(back.created_at_micros, 2000, "identical content was not re-stored");
}
struct WarehouseSnapshotStore<'a> {
wh: &'a IcebergWarehouse,
git_sha: String,
}
impl SnapshotStore for WarehouseSnapshotStore<'_> {
fn fetch_reference(&self, key: &SnapshotKey) -> Result<Option<SnapshotRef>, String> {
let got = latest_ui_snapshot_sync(self.wh, &key.repo, &key.krate, &key.test_name, &key.theme)
.map_err(|e| format!("{e:#}"))?;
Ok(got.map(|r| SnapshotRef {
webp: r.webp,
width: r.width as u32,
height: r.height as u32,
content_sha: r.content_sha,
}))
}
fn store_reference(&self, key: &SnapshotKey, r: &SnapshotRef) -> Result<(), String> {
let row = SnapshotRow {
repo: key.repo.clone(),
krate: key.krate.clone(),
test_name: key.test_name.clone(),
theme: key.theme.clone(),
webp: r.webp.clone(),
width: r.width as i32,
height: r.height as i32,
content_sha: r.content_sha.clone(),
git_sha: self.git_sha.clone(),
created_at_micros: Utc::now().timestamp_micros(),
};
append_ui_snapshot_sync(self.wh, &row).map_err(|e| format!("{e:#}"))
}
}
#[test]
fn known_image_round_trips_encode_store_fetch_diff() {
let (_d, wh) = wh();
let store = WarehouseSnapshotStore { wh: &wh, git_sha: "abc123".into() };
let key = SnapshotKey::new("nornir", "nornir-robotui", "demo_panel", "dark");
let img = gradient(48, 30);
let thr = SnapThreshold::default();
let r = assert_snapshot_warehouse(&key, &img, SnapMode::Check, thr, &store).unwrap();
assert!(matches!(r, SnapResult::Created { .. }), "first run records the warehouse reference");
let stored = latest_ui_snapshot_sync(&wh, "nornir", "nornir-robotui", "demo_panel", "dark")
.unwrap()
.expect("warehouse holds the reference");
assert_eq!((stored.width, stored.height), (48, 30));
let decoded = snapshot::decode(&stored.webp).unwrap();
assert_eq!(snapshot::diff_pixels(&img, &decoded, 0), 0, "stored WebP is pixel-exact");
let r = assert_snapshot_warehouse(&key, &img, SnapMode::Check, thr, &store).unwrap();
assert!(matches!(r, SnapResult::Match { diff_px: 0 }), "identical frame matches the stored ref");
let mut changed = img.clone();
for x in 0..48 {
changed.put_pixel(x, 15, Rgba([255, 0, 0, 255]));
}
let r = assert_snapshot_warehouse(&key, &changed, SnapMode::Check, thr, &store).unwrap();
match r {
SnapResult::Mismatch { diff_px, .. } => assert_eq!(diff_px, 48, "the changed row differs"),
other => panic!("expected Mismatch, got {other:?}"),
}
let r = assert_snapshot_warehouse(&key, &changed, SnapMode::Update, thr, &store).unwrap();
assert!(matches!(r, SnapResult::Updated { .. }), "update writes a new warehouse reference");
let r = assert_snapshot_warehouse(&key, &changed, SnapMode::Check, thr, &store).unwrap();
assert!(matches!(r, SnapResult::Match { .. }), "updated reference now matches");
}
}