Skip to main content

facett_icebergview/
lib.rs

1//! **facett-icebergview** — browse **and time-travel** one Iceberg table, drawn
2//! from **Arrow `RecordBatch`es**. The component is *engine-agnostic*: you hand
3//! it, per Iceberg snapshot, the snapshot's metadata + its materialized Arrow
4//! frame; it renders a **snapshot picker** (newest-first, showing snapshot id /
5//! timestamp / row-count) over a virtualized, **paged** row grid (the rows come
6//! from [`facett_table::Table`], built from the batch via [`facett_arrow`]).
7//! Selecting a snapshot loads *that* snapshot's frame — time travel.
8//!
9//! Like every facett component it implements [`Facet`] (`title` / `ui` /
10//! `state_json`), so it drops into a `FacetDeck` and is robot-testable through
11//! `facett_core::harness`: drive [`IcebergView::select_snapshot`] / [`IcebergView::set_page`]
12//! headlessly and assert `state_json` (the current snapshot, the snapshot list,
13//! and the visible rows all change). `state_json` exposes:
14//!
15//! - `table` — the table name,
16//! - `snapshots` — every snapshot newest-first (`id`, `timestamp_ms`, `rows`),
17//! - `current` — the selected snapshot id,
18//! - `total_rows` / `page` / `page_size` / `visible_rows` — the paged view.
19//!
20//! **Why not just `facett-table`?** A table viewer shows *one* frame. This adds
21//! the Iceberg dimension — the history of snapshots and the control to scrub
22//! between them — which is the whole point of a lakehouse table browser. The
23//! adapter that *fills* it from a real catalog (skade) lives host-side (korp /
24//! nornir) so this crate never takes a hard catalog/engine dependency.
25
26use arrow_array::RecordBatch;
27use egui::Ui;
28use facett_core::{Facet, FacetCaps, theme};
29use facett_table::Table;
30
31/// One Iceberg snapshot's identity, for the picker + `state_json`. Mirrors what
32/// `iceberg::spec::Snapshot` exposes (`snapshot_id()` / `timestamp_ms()`) plus a
33/// row count the host computes (scan count, or the snapshot summary's
34/// `total-records`).
35#[derive(Clone, Debug, PartialEq, Eq)]
36pub struct SnapshotMeta {
37    /// The Iceberg snapshot id.
38    pub id: i64,
39    /// Snapshot commit time, epoch milliseconds (Iceberg `timestamp_ms`).
40    pub timestamp_ms: i64,
41    /// Row count of this snapshot (the host fills it; 0 if unknown).
42    pub rows: u64,
43}
44
45impl SnapshotMeta {
46    pub fn new(id: i64, timestamp_ms: i64, rows: u64) -> Self {
47        Self { id, timestamp_ms, rows }
48    }
49}
50
51/// One snapshot paired with the Arrow frame that materializes it. The frame is a
52/// `Vec<RecordBatch>` exactly as a scan returns it; `IcebergView` concatenates
53/// the rows into a paged [`Table`] for rendering.
54pub struct Snapshot {
55    pub meta: SnapshotMeta,
56    pub batches: Vec<RecordBatch>,
57}
58
59impl Snapshot {
60    pub fn new(meta: SnapshotMeta, batches: Vec<RecordBatch>) -> Self {
61        Self { meta, batches }
62    }
63    /// Total rows across the snapshot's batches.
64    pub fn row_count(&self) -> usize {
65        self.batches.iter().map(|b| b.num_rows()).sum()
66    }
67    /// Column names (from the first batch's schema; empty if no batches).
68    fn columns(&self) -> Vec<String> {
69        self.batches
70            .first()
71            .map(|b| b.schema().fields().iter().map(|f| f.name().clone()).collect())
72            .unwrap_or_default()
73    }
74}
75
76/// Default rows shown per page (the Table is virtualized, but paging keeps a huge
77/// snapshot from being cell-formatted all at once and makes `visible_rows`
78/// meaningful for headless assertions).
79pub const DEFAULT_PAGE_SIZE: usize = 500;
80
81/// A browse + time-travel viewer for one Iceberg table.
82pub struct IcebergView {
83    title: String,
84    /// The table being browsed.
85    table_name: String,
86    /// Snapshots, **stored newest-first** (the picker order). The host can pass
87    /// them in any order; the constructor sorts by `timestamp_ms` descending.
88    snapshots: Vec<Snapshot>,
89    /// Index into `snapshots` of the selected (currently rendered) snapshot.
90    current: usize,
91    /// Current page (0-based) within the selected snapshot.
92    page: usize,
93    /// Rows per page.
94    page_size: usize,
95    /// The rendered grid for the current snapshot+page. Rebuilt on any switch.
96    grid: Table,
97}
98
99impl IcebergView {
100    /// Build a viewer over one table's snapshots. `snapshots` may be in any
101    /// order — they're sorted newest-first; the newest is selected initially.
102    /// An empty `snapshots` yields an empty viewer (renders an empty-state hint).
103    pub fn new(title: impl Into<String>, table_name: impl Into<String>, mut snapshots: Vec<Snapshot>) -> Self {
104        // Newest-first: latest timestamp at the top of the picker. Ties broken by
105        // the larger snapshot id (later commit).
106        snapshots.sort_by(|a, b| {
107            b.meta.timestamp_ms.cmp(&a.meta.timestamp_ms).then(b.meta.id.cmp(&a.meta.id))
108        });
109        let table_name = table_name.into();
110        let mut me = Self {
111            title: title.into(),
112            table_name,
113            snapshots,
114            current: 0,
115            page: 0,
116            page_size: DEFAULT_PAGE_SIZE,
117            grid: Table::new("snapshot", Vec::new()),
118        };
119        me.rebuild_grid();
120        me
121    }
122
123    /// Page size builder (rows per page). Clamped to ≥1.
124    pub fn with_page_size(mut self, n: usize) -> Self {
125        self.page_size = n.max(1);
126        self.page = 0;
127        self.rebuild_grid();
128        self
129    }
130
131    /// The table name being browsed.
132    pub fn table_name(&self) -> &str {
133        &self.table_name
134    }
135
136    /// Snapshot metadata, newest-first (the picker order).
137    pub fn snapshot_metas(&self) -> Vec<SnapshotMeta> {
138        self.snapshots.iter().map(|s| s.meta.clone()).collect()
139    }
140
141    /// The selected snapshot's metadata (None if the table has no snapshots).
142    pub fn current_snapshot(&self) -> Option<&SnapshotMeta> {
143        self.snapshots.get(self.current).map(|s| &s.meta)
144    }
145
146    /// The selected snapshot's id (None if the table has no snapshots).
147    pub fn current_id(&self) -> Option<i64> {
148        self.current_snapshot().map(|m| m.id)
149    }
150
151    /// Total rows in the selected snapshot.
152    pub fn total_rows(&self) -> usize {
153        self.snapshots.get(self.current).map(|s| s.row_count()).unwrap_or(0)
154    }
155
156    /// Number of pages in the selected snapshot (≥1 while a snapshot exists).
157    pub fn page_count(&self) -> usize {
158        if self.snapshots.is_empty() {
159            0
160        } else {
161            (self.total_rows().max(1) + self.page_size - 1) / self.page_size
162        }
163    }
164
165    /// **Time-travel**: select the snapshot with id `id` and load its frame.
166    /// Returns true if the id was found (and selection changed or reloaded).
167    /// Resets to page 0. Unknown id is a no-op returning false.
168    pub fn select_snapshot(&mut self, id: i64) -> bool {
169        match self.snapshots.iter().position(|s| s.meta.id == id) {
170            Some(i) => {
171                self.current = i;
172                self.page = 0;
173                self.rebuild_grid();
174                true
175            }
176            None => false,
177        }
178    }
179
180    /// Select a snapshot by its picker index (0 = newest). No-op if out of range.
181    pub fn select_index(&mut self, i: usize) {
182        if i < self.snapshots.len() {
183            self.current = i;
184            self.page = 0;
185            self.rebuild_grid();
186        }
187    }
188
189    /// Go to page `p` (0-based) within the current snapshot, clamped to range.
190    pub fn set_page(&mut self, p: usize) {
191        let last = self.page_count().saturating_sub(1);
192        self.page = p.min(last);
193        self.rebuild_grid();
194    }
195
196    /// The currently-rendered grid (its rows are the visible page).
197    pub fn grid(&self) -> &Table {
198        &self.grid
199    }
200
201    /// Rebuild the [`Table`] for the current snapshot + page. The whole snapshot
202    /// frame is concatenated per-batch and the page window sliced out, then each
203    /// batch in the window is mapped to string rows via `facett_arrow`.
204    fn rebuild_grid(&mut self) {
205        let Some(snap) = self.snapshots.get(self.current) else {
206            self.grid = Table::new("snapshot", Vec::new());
207            return;
208        };
209        let columns = snap.columns();
210        let mut grid = Table::new(format!("{} @ {}", self.table_name, snap.meta.id), columns);
211
212        let start = self.page * self.page_size;
213        let end = (start + self.page_size).min(snap.row_count());
214
215        // Walk the batches, emitting only rows in [start, end).
216        let mut row0 = 0usize; // absolute index of this batch's first row
217        for batch in &snap.batches {
218            let n = batch.num_rows();
219            let b_lo = start.saturating_sub(row0).min(n);
220            let b_hi = end.saturating_sub(row0).min(n);
221            if b_lo < b_hi {
222                // facett-arrow formats a whole batch; slice to the wanted window
223                // first so we never format rows the page doesn't show.
224                let window = batch.slice(b_lo, b_hi - b_lo);
225                let t = facett_arrow::table_from_batch(&window, "");
226                for r in t.rows {
227                    grid.push_row(r);
228                }
229            }
230            row0 += n;
231            if row0 >= end {
232                break;
233            }
234        }
235        self.grid = grid;
236    }
237}
238
239impl Facet for IcebergView {
240    fn title(&self) -> &str {
241        &self.title
242    }
243
244    fn ui(&mut self, ui: &mut Ui) {
245        let th = theme(ui);
246        if self.snapshots.is_empty() {
247            ui.weak(format!("{} — no snapshots", self.table_name));
248            return;
249        }
250
251        // ── snapshot picker (newest-first) + time-travel ──────────────────────
252        let mut pick: Option<usize> = None;
253        ui.horizontal(|ui| {
254            ui.strong("⏱ snapshot:");
255            egui::ComboBox::from_id_salt(("icebergview_snap", &self.title))
256                .selected_text(
257                    self.current_snapshot()
258                        .map(|m| format!("#{} · {} rows", m.id, m.rows))
259                        .unwrap_or_else(|| "—".into()),
260                )
261                .show_ui(ui, |ui| {
262                    for (i, s) in self.snapshots.iter().enumerate() {
263                        let m = &s.meta;
264                        let label = format!(
265                            "{}#{}  ·  t={}ms  ·  {} rows",
266                            if i == 0 { "▲ " } else { "" },
267                            m.id,
268                            m.timestamp_ms,
269                            m.rows,
270                        );
271                        if ui.selectable_label(i == self.current, label).clicked() {
272                            pick = Some(i);
273                        }
274                    }
275                });
276            ui.weak(format!("({} snapshots)", self.snapshots.len()));
277        });
278        if let Some(i) = pick {
279            self.select_index(i);
280        }
281
282        // ── pager ─────────────────────────────────────────────────────────────
283        let pages = self.page_count();
284        ui.horizontal(|ui| {
285            ui.label(format!("table {}", self.table_name));
286            ui.separator();
287            if ui.add_enabled(self.page > 0, egui::Button::new("◀ prev")).clicked() {
288                self.set_page(self.page.saturating_sub(1));
289            }
290            ui.colored_label(th.text, format!("page {}/{}", self.page + 1, pages.max(1)));
291            if ui.add_enabled(self.page + 1 < pages, egui::Button::new("next ▶")).clicked() {
292                self.set_page(self.page + 1);
293            }
294            ui.separator();
295            ui.weak(format!("{} rows total", self.total_rows()));
296        });
297
298        ui.separator();
299        // The page's rows render through the virtualized facett-table grid.
300        self.grid.ui(ui);
301    }
302
303    fn state_json(&self) -> serde_json::Value {
304        serde_json::json!({
305            "table": self.table_name,
306            "snapshots": self.snapshots.iter().map(|s| serde_json::json!({
307                "id": s.meta.id,
308                "timestamp_ms": s.meta.timestamp_ms,
309                "rows": s.meta.rows,
310            })).collect::<Vec<_>>(),
311            "current": self.current_id(),
312            "total_rows": self.total_rows(),
313            "page": self.page,
314            "page_size": self.page_size,
315            "page_count": self.page_count(),
316            "columns": self.grid.columns,
317            "visible_rows": self.grid.rows.len(),
318        })
319    }
320
321    fn selection_json(&self) -> serde_json::Value {
322        match self.current_id() {
323            Some(id) => serde_json::json!(id),
324            None => serde_json::Value::Null,
325        }
326    }
327
328    /// Standard egui widgets (ComboBox, buttons, the inner Table) follow the
329    /// active `Theme`'s `Visuals`. Selectable (the snapshot) + resizable.
330    fn caps(&self) -> FacetCaps {
331        FacetCaps::NONE.themeable().selectable().resizable()
332    }
333
334    /// Opt into typed downcast so a host can drive time-travel
335    /// ([`IcebergView::select_snapshot`] / [`IcebergView::set_page`]) when this view
336    /// lives boxed inside a `FacetDeck` (e.g. the demo's robot-UI driver).
337    fn as_any_mut(&mut self) -> Option<&mut dyn std::any::Any> {
338        Some(self)
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345    use std::sync::Arc;
346
347    use arrow_array::{Int64Array, StringArray};
348    use arrow_schema::{DataType, Field, Schema};
349    use facett_core::harness;
350
351    /// A `value`-column batch with `vals` rows.
352    fn batch(ids: &[i64], names: &[&str]) -> RecordBatch {
353        RecordBatch::try_new(
354            Arc::new(Schema::new(vec![
355                Field::new("id", DataType::Int64, false),
356                Field::new("name", DataType::Utf8, false),
357            ])),
358            vec![
359                Arc::new(Int64Array::from(ids.to_vec())),
360                Arc::new(StringArray::from(names.to_vec())),
361            ],
362        )
363        .unwrap()
364    }
365
366    /// Two snapshots of the same table: v1 (older, 2 rows) and v2 (newer, 3 rows).
367    fn two_snapshot_view() -> IcebergView {
368        let v1 = Snapshot::new(SnapshotMeta::new(100, 1_000, 2), vec![batch(&[1, 2], &["knut", "korp"])]);
369        let v2 = Snapshot::new(
370            SnapshotMeta::new(200, 2_000, 3),
371            vec![batch(&[1, 2, 3], &["knut", "korp", "skade"])],
372        );
373        // Pass oldest-first on purpose — the view must sort newest-first.
374        IcebergView::new("🧊 Tables", "Person", vec![v1, v2])
375    }
376
377    #[test]
378    fn newest_snapshot_selected_first_and_listed_newest_first() {
379        let v = two_snapshot_view();
380        // Picker order: newest (200) first.
381        let metas = v.snapshot_metas();
382        assert_eq!(metas[0].id, 200);
383        assert_eq!(metas[1].id, 100);
384        // Initial selection = newest.
385        assert_eq!(v.current_id(), Some(200));
386        assert_eq!(v.total_rows(), 3);
387    }
388
389    #[test]
390    fn switching_snapshots_changes_rendered_rows_and_state() {
391        let mut v = two_snapshot_view();
392
393        // Newest (v2): 3 rows rendered, third row is skade.
394        assert_eq!(v.grid().rows.len(), 3);
395        assert_eq!(v.grid().rows[2], vec!["3".to_string(), "skade".to_string()]);
396        let s2 = v.state_json();
397        assert_eq!(s2["current"], 200);
398        assert_eq!(s2["total_rows"], 3);
399        assert_eq!(s2["visible_rows"], 3);
400
401        // TIME TRAVEL → older snapshot v1: rendered rows shrink to 2, no skade.
402        assert!(v.select_snapshot(100), "100 is a known snapshot");
403        assert_eq!(v.current_id(), Some(100));
404        assert_eq!(v.grid().rows.len(), 2);
405        assert!(v.grid().rows.iter().all(|r| r[1] != "skade"), "v1 predates skade");
406        let s1 = v.state_json();
407        assert_eq!(s1["current"], 100);
408        assert_eq!(s1["total_rows"], 2);
409        assert_eq!(s1["visible_rows"], 2);
410
411        // The two snapshots produced demonstrably different rendered state.
412        assert_ne!(s1["current"], s2["current"]);
413        assert_ne!(s1["visible_rows"], s2["visible_rows"]);
414
415        // Unknown id is a no-op.
416        assert!(!v.select_snapshot(999));
417        assert_eq!(v.current_id(), Some(100));
418    }
419
420    #[test]
421    fn state_json_carries_full_snapshot_list() {
422        let v = two_snapshot_view();
423        let j = v.state_json();
424        assert_eq!(j["table"], "Person");
425        let snaps = j["snapshots"].as_array().unwrap();
426        assert_eq!(snaps.len(), 2);
427        assert_eq!(snaps[0]["id"], 200); // newest-first
428        assert_eq!(snaps[0]["rows"], 3);
429        assert_eq!(snaps[1]["id"], 100);
430        assert_eq!(j["columns"].as_array().unwrap().len(), 2);
431    }
432
433    #[test]
434    fn paging_windows_the_rows() {
435        // 5 rows, page_size 2 → 3 pages; each page shows its slice.
436        let snap = Snapshot::new(
437            SnapshotMeta::new(1, 10, 5),
438            vec![batch(&[1, 2, 3, 4, 5], &["a", "b", "c", "d", "e"])],
439        );
440        let mut v = IcebergView::new("t", "T", vec![snap]).with_page_size(2);
441        assert_eq!(v.page_count(), 3);
442        assert_eq!(v.grid().rows.len(), 2);
443        assert_eq!(v.grid().rows[0][1], "a");
444        v.set_page(1);
445        assert_eq!(v.grid().rows.len(), 2);
446        assert_eq!(v.grid().rows[0][1], "c");
447        v.set_page(2);
448        assert_eq!(v.grid().rows.len(), 1); // last page has the remainder
449        assert_eq!(v.grid().rows[0][1], "e");
450        // Clamp beyond the last page.
451        v.set_page(99);
452        assert_eq!(v.state_json()["page"], 2);
453    }
454
455    #[test]
456    fn paging_windows_across_multiple_batches() {
457        // Two batches (3 + 3 rows), page_size 4 → page 0 spans the batch boundary.
458        let snap = Snapshot::new(
459            SnapshotMeta::new(7, 70, 6),
460            vec![batch(&[1, 2, 3], &["a", "b", "c"]), batch(&[4, 5, 6], &["d", "e", "f"])],
461        );
462        let mut v = IcebergView::new("t", "T", vec![snap]).with_page_size(4);
463        assert_eq!(v.grid().rows.len(), 4);
464        assert_eq!(v.grid().rows[3][1], "d", "page 0 crosses into the second batch");
465        v.set_page(1);
466        assert_eq!(v.grid().rows.len(), 2);
467        assert_eq!(v.grid().rows[0][1], "e");
468    }
469
470    #[test]
471    fn headless_render_draws_and_reports_state() {
472        // Inject 2 snapshots, render the newest offscreen, time-travel, render
473        // again — assert it both DREW and that state reflects the switch.
474        let mut v = two_snapshot_view();
475        let r2 = harness::headless_render(&mut v);
476        assert_eq!(r2.title, "🧊 Tables");
477        assert!(r2.drew(), "a snapshot of rows should tessellate to vertices");
478        assert_eq!(r2.state["current"], 200);
479        assert_eq!(r2.state["visible_rows"], 3);
480
481        v.select_snapshot(100);
482        let r1 = harness::headless_render(&mut v);
483        assert_eq!(r1.state["current"], 100);
484        assert_eq!(r1.state["visible_rows"], 2);
485        assert_ne!(r1.state["visible_rows"], r2.state["visible_rows"]);
486    }
487
488    #[test]
489    fn empty_view_renders_hint_without_panic() {
490        let mut v = IcebergView::new("t", "Empty", Vec::new());
491        assert_eq!(v.current_id(), None);
492        assert_eq!(v.total_rows(), 0);
493        assert_eq!(v.page_count(), 0);
494        let r = harness::headless_render(&mut v);
495        assert_eq!(r.state["snapshots"].as_array().unwrap().len(), 0);
496        assert_eq!(r.state["current"], serde_json::Value::Null);
497    }
498
499    #[test]
500    fn caps_advertise_themeable_selectable_resizable() {
501        let c = two_snapshot_view().caps();
502        assert!(c.themeable && c.selectable && c.resizable);
503        assert!(!c.scalable && !c.copyable);
504    }
505}