facett-icebergview 0.1.6

facett — browse + time-travel an Iceberg table from Arrow frames: a snapshot picker (newest-first, id/timestamp/rows) over a virtualized row grid; engine-agnostic (RecordBatch in)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
//! **facett-icebergview** — browse **and time-travel** one Iceberg table, drawn
//! from **Arrow `RecordBatch`es**. The component is *engine-agnostic*: you hand
//! it, per Iceberg snapshot, the snapshot's metadata + its materialized Arrow
//! frame; it renders a **snapshot picker** (newest-first, showing snapshot id /
//! timestamp / row-count) over a virtualized, **paged** row grid (the rows come
//! from [`facett_table::Table`], built from the batch via [`facett_arrow`]).
//! Selecting a snapshot loads *that* snapshot's frame — time travel.
//!
//! Like every facett component it implements [`Facet`] (`title` / `ui` /
//! `state_json`), so it drops into a `FacetDeck` and is robot-testable through
//! `facett_core::harness`: drive [`IcebergView::select_snapshot`] / [`IcebergView::set_page`]
//! headlessly and assert `state_json` (the current snapshot, the snapshot list,
//! and the visible rows all change). `state_json` exposes:
//!
//! - `table` — the table name,
//! - `snapshots` — every snapshot newest-first (`id`, `timestamp_ms`, `rows`),
//! - `current` — the selected snapshot id,
//! - `total_rows` / `page` / `page_size` / `visible_rows` — the paged view.
//!
//! **Why not just `facett-table`?** A table viewer shows *one* frame. This adds
//! the Iceberg dimension — the history of snapshots and the control to scrub
//! between them — which is the whole point of a lakehouse table browser. The
//! adapter that *fills* it from a real catalog (skade) lives host-side (korp /
//! nornir) so this crate never takes a hard catalog/engine dependency.

use arrow_array::RecordBatch;
use egui::Ui;
use facett_core::{Facet, FacetCaps, theme};
use facett_table::Table;

/// One Iceberg snapshot's identity, for the picker + `state_json`. Mirrors what
/// `iceberg::spec::Snapshot` exposes (`snapshot_id()` / `timestamp_ms()`) plus a
/// row count the host computes (scan count, or the snapshot summary's
/// `total-records`).
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct SnapshotMeta {
    /// The Iceberg snapshot id.
    pub id: i64,
    /// Snapshot commit time, epoch milliseconds (Iceberg `timestamp_ms`).
    pub timestamp_ms: i64,
    /// Row count of this snapshot (the host fills it; 0 if unknown).
    pub rows: u64,
}

impl SnapshotMeta {
    pub fn new(id: i64, timestamp_ms: i64, rows: u64) -> Self {
        Self { id, timestamp_ms, rows }
    }
}

/// One snapshot paired with the Arrow frame that materializes it. The frame is a
/// `Vec<RecordBatch>` exactly as a scan returns it; `IcebergView` concatenates
/// the rows into a paged [`Table`] for rendering.
pub struct Snapshot {
    pub meta: SnapshotMeta,
    pub batches: Vec<RecordBatch>,
}

impl Snapshot {
    pub fn new(meta: SnapshotMeta, batches: Vec<RecordBatch>) -> Self {
        Self { meta, batches }
    }
    /// Total rows across the snapshot's batches.
    pub fn row_count(&self) -> usize {
        self.batches.iter().map(|b| b.num_rows()).sum()
    }
    /// Column names (from the first batch's schema; empty if no batches).
    fn columns(&self) -> Vec<String> {
        self.batches
            .first()
            .map(|b| b.schema().fields().iter().map(|f| f.name().clone()).collect())
            .unwrap_or_default()
    }
}

/// Default rows shown per page (the Table is virtualized, but paging keeps a huge
/// snapshot from being cell-formatted all at once and makes `visible_rows`
/// meaningful for headless assertions).
pub const DEFAULT_PAGE_SIZE: usize = 500;

/// A browse + time-travel viewer for one Iceberg table.
pub struct IcebergView {
    title: String,
    /// The table being browsed.
    table_name: String,
    /// Snapshots, **stored newest-first** (the picker order). The host can pass
    /// them in any order; the constructor sorts by `timestamp_ms` descending.
    snapshots: Vec<Snapshot>,
    /// Index into `snapshots` of the selected (currently rendered) snapshot.
    current: usize,
    /// Current page (0-based) within the selected snapshot.
    page: usize,
    /// Rows per page.
    page_size: usize,
    /// The rendered grid for the current snapshot+page. Rebuilt on any switch.
    grid: Table,
}

impl IcebergView {
    /// Build a viewer over one table's snapshots. `snapshots` may be in any
    /// order — they're sorted newest-first; the newest is selected initially.
    /// An empty `snapshots` yields an empty viewer (renders an empty-state hint).
    pub fn new(title: impl Into<String>, table_name: impl Into<String>, mut snapshots: Vec<Snapshot>) -> Self {
        // Newest-first: latest timestamp at the top of the picker. Ties broken by
        // the larger snapshot id (later commit).
        snapshots.sort_by(|a, b| {
            b.meta.timestamp_ms.cmp(&a.meta.timestamp_ms).then(b.meta.id.cmp(&a.meta.id))
        });
        let table_name = table_name.into();
        let mut me = Self {
            title: title.into(),
            table_name,
            snapshots,
            current: 0,
            page: 0,
            page_size: DEFAULT_PAGE_SIZE,
            grid: Table::new("snapshot", Vec::new()),
        };
        me.rebuild_grid();
        me
    }

    /// Page size builder (rows per page). Clamped to ≥1.
    pub fn with_page_size(mut self, n: usize) -> Self {
        self.page_size = n.max(1);
        self.page = 0;
        self.rebuild_grid();
        self
    }

    /// The table name being browsed.
    pub fn table_name(&self) -> &str {
        &self.table_name
    }

    /// Snapshot metadata, newest-first (the picker order).
    pub fn snapshot_metas(&self) -> Vec<SnapshotMeta> {
        self.snapshots.iter().map(|s| s.meta.clone()).collect()
    }

    /// The selected snapshot's metadata (None if the table has no snapshots).
    pub fn current_snapshot(&self) -> Option<&SnapshotMeta> {
        self.snapshots.get(self.current).map(|s| &s.meta)
    }

    /// The selected snapshot's id (None if the table has no snapshots).
    pub fn current_id(&self) -> Option<i64> {
        self.current_snapshot().map(|m| m.id)
    }

    /// Total rows in the selected snapshot.
    pub fn total_rows(&self) -> usize {
        self.snapshots.get(self.current).map(|s| s.row_count()).unwrap_or(0)
    }

    /// Number of pages in the selected snapshot (≥1 while a snapshot exists).
    pub fn page_count(&self) -> usize {
        if self.snapshots.is_empty() {
            0
        } else {
            (self.total_rows().max(1) + self.page_size - 1) / self.page_size
        }
    }

    /// **Time-travel**: select the snapshot with id `id` and load its frame.
    /// Returns true if the id was found (and selection changed or reloaded).
    /// Resets to page 0. Unknown id is a no-op returning false.
    pub fn select_snapshot(&mut self, id: i64) -> bool {
        match self.snapshots.iter().position(|s| s.meta.id == id) {
            Some(i) => {
                self.current = i;
                self.page = 0;
                self.rebuild_grid();
                true
            }
            None => false,
        }
    }

    /// Select a snapshot by its picker index (0 = newest). No-op if out of range.
    pub fn select_index(&mut self, i: usize) {
        if i < self.snapshots.len() {
            self.current = i;
            self.page = 0;
            self.rebuild_grid();
        }
    }

    /// Go to page `p` (0-based) within the current snapshot, clamped to range.
    pub fn set_page(&mut self, p: usize) {
        let last = self.page_count().saturating_sub(1);
        self.page = p.min(last);
        self.rebuild_grid();
    }

    /// The currently-rendered grid (its rows are the visible page).
    pub fn grid(&self) -> &Table {
        &self.grid
    }

    /// Rebuild the [`Table`] for the current snapshot + page. The whole snapshot
    /// frame is concatenated per-batch and the page window sliced out, then each
    /// batch in the window is mapped to string rows via `facett_arrow`.
    fn rebuild_grid(&mut self) {
        let Some(snap) = self.snapshots.get(self.current) else {
            self.grid = Table::new("snapshot", Vec::new());
            return;
        };
        let columns = snap.columns();
        let mut grid = Table::new(format!("{} @ {}", self.table_name, snap.meta.id), columns);

        let start = self.page * self.page_size;
        let end = (start + self.page_size).min(snap.row_count());

        // Walk the batches, emitting only rows in [start, end).
        let mut row0 = 0usize; // absolute index of this batch's first row
        for batch in &snap.batches {
            let n = batch.num_rows();
            let b_lo = start.saturating_sub(row0).min(n);
            let b_hi = end.saturating_sub(row0).min(n);
            if b_lo < b_hi {
                // facett-arrow formats a whole batch; slice to the wanted window
                // first so we never format rows the page doesn't show.
                let window = batch.slice(b_lo, b_hi - b_lo);
                let t = facett_arrow::table_from_batch(&window, "");
                for r in t.rows {
                    grid.push_row(r);
                }
            }
            row0 += n;
            if row0 >= end {
                break;
            }
        }
        self.grid = grid;
    }
}

impl Facet for IcebergView {
    fn title(&self) -> &str {
        &self.title
    }

    fn ui(&mut self, ui: &mut Ui) {
        let th = theme(ui);
        if self.snapshots.is_empty() {
            ui.weak(format!("{} — no snapshots", self.table_name));
            return;
        }

        // ── snapshot picker (newest-first) + time-travel ──────────────────────
        let mut pick: Option<usize> = None;
        ui.horizontal(|ui| {
            ui.strong("⏱ snapshot:");
            egui::ComboBox::from_id_salt(("icebergview_snap", &self.title))
                .selected_text(
                    self.current_snapshot()
                        .map(|m| format!("#{} · {} rows", m.id, m.rows))
                        .unwrap_or_else(|| "".into()),
                )
                .show_ui(ui, |ui| {
                    for (i, s) in self.snapshots.iter().enumerate() {
                        let m = &s.meta;
                        let label = format!(
                            "{}#{}  ·  t={}ms  ·  {} rows",
                            if i == 0 { "" } else { "" },
                            m.id,
                            m.timestamp_ms,
                            m.rows,
                        );
                        if ui.selectable_label(i == self.current, label).clicked() {
                            pick = Some(i);
                        }
                    }
                });
            ui.weak(format!("({} snapshots)", self.snapshots.len()));
        });
        if let Some(i) = pick {
            self.select_index(i);
        }

        // ── pager ─────────────────────────────────────────────────────────────
        let pages = self.page_count();
        ui.horizontal(|ui| {
            ui.label(format!("table {}", self.table_name));
            ui.separator();
            if ui.add_enabled(self.page > 0, egui::Button::new("◀ prev")).clicked() {
                self.set_page(self.page.saturating_sub(1));
            }
            ui.colored_label(th.text, format!("page {}/{}", self.page + 1, pages.max(1)));
            if ui.add_enabled(self.page + 1 < pages, egui::Button::new("next ▶")).clicked() {
                self.set_page(self.page + 1);
            }
            ui.separator();
            ui.weak(format!("{} rows total", self.total_rows()));
        });

        ui.separator();
        // The page's rows render through the virtualized facett-table grid.
        self.grid.ui(ui);
    }

    fn state_json(&self) -> serde_json::Value {
        serde_json::json!({
            "table": self.table_name,
            "snapshots": self.snapshots.iter().map(|s| serde_json::json!({
                "id": s.meta.id,
                "timestamp_ms": s.meta.timestamp_ms,
                "rows": s.meta.rows,
            })).collect::<Vec<_>>(),
            "current": self.current_id(),
            "total_rows": self.total_rows(),
            "page": self.page,
            "page_size": self.page_size,
            "page_count": self.page_count(),
            "columns": self.grid.columns,
            "visible_rows": self.grid.rows.len(),
        })
    }

    fn selection_json(&self) -> serde_json::Value {
        match self.current_id() {
            Some(id) => serde_json::json!(id),
            None => serde_json::Value::Null,
        }
    }

    /// Standard egui widgets (ComboBox, buttons, the inner Table) follow the
    /// active `Theme`'s `Visuals`. Selectable (the snapshot) + resizable.
    fn caps(&self) -> FacetCaps {
        FacetCaps::NONE.themeable().selectable().resizable()
    }

    /// Opt into typed downcast so a host can drive time-travel
    /// ([`IcebergView::select_snapshot`] / [`IcebergView::set_page`]) when this view
    /// lives boxed inside a `FacetDeck` (e.g. the demo's robot-UI driver).
    fn as_any_mut(&mut self) -> Option<&mut dyn std::any::Any> {
        Some(self)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Arc;

    use arrow_array::{Int64Array, StringArray};
    use arrow_schema::{DataType, Field, Schema};
    use facett_core::harness;

    /// A `value`-column batch with `vals` rows.
    fn batch(ids: &[i64], names: &[&str]) -> RecordBatch {
        RecordBatch::try_new(
            Arc::new(Schema::new(vec![
                Field::new("id", DataType::Int64, false),
                Field::new("name", DataType::Utf8, false),
            ])),
            vec![
                Arc::new(Int64Array::from(ids.to_vec())),
                Arc::new(StringArray::from(names.to_vec())),
            ],
        )
        .unwrap()
    }

    /// Two snapshots of the same table: v1 (older, 2 rows) and v2 (newer, 3 rows).
    fn two_snapshot_view() -> IcebergView {
        let v1 = Snapshot::new(SnapshotMeta::new(100, 1_000, 2), vec![batch(&[1, 2], &["knut", "korp"])]);
        let v2 = Snapshot::new(
            SnapshotMeta::new(200, 2_000, 3),
            vec![batch(&[1, 2, 3], &["knut", "korp", "skade"])],
        );
        // Pass oldest-first on purpose — the view must sort newest-first.
        IcebergView::new("🧊 Tables", "Person", vec![v1, v2])
    }

    #[test]
    fn newest_snapshot_selected_first_and_listed_newest_first() {
        let v = two_snapshot_view();
        // Picker order: newest (200) first.
        let metas = v.snapshot_metas();
        assert_eq!(metas[0].id, 200);
        assert_eq!(metas[1].id, 100);
        // Initial selection = newest.
        assert_eq!(v.current_id(), Some(200));
        assert_eq!(v.total_rows(), 3);
    }

    #[test]
    fn switching_snapshots_changes_rendered_rows_and_state() {
        let mut v = two_snapshot_view();

        // Newest (v2): 3 rows rendered, third row is skade.
        assert_eq!(v.grid().rows.len(), 3);
        assert_eq!(v.grid().rows[2], vec!["3".to_string(), "skade".to_string()]);
        let s2 = v.state_json();
        assert_eq!(s2["current"], 200);
        assert_eq!(s2["total_rows"], 3);
        assert_eq!(s2["visible_rows"], 3);

        // TIME TRAVEL → older snapshot v1: rendered rows shrink to 2, no skade.
        assert!(v.select_snapshot(100), "100 is a known snapshot");
        assert_eq!(v.current_id(), Some(100));
        assert_eq!(v.grid().rows.len(), 2);
        assert!(v.grid().rows.iter().all(|r| r[1] != "skade"), "v1 predates skade");
        let s1 = v.state_json();
        assert_eq!(s1["current"], 100);
        assert_eq!(s1["total_rows"], 2);
        assert_eq!(s1["visible_rows"], 2);

        // The two snapshots produced demonstrably different rendered state.
        assert_ne!(s1["current"], s2["current"]);
        assert_ne!(s1["visible_rows"], s2["visible_rows"]);

        // Unknown id is a no-op.
        assert!(!v.select_snapshot(999));
        assert_eq!(v.current_id(), Some(100));
    }

    #[test]
    fn state_json_carries_full_snapshot_list() {
        let v = two_snapshot_view();
        let j = v.state_json();
        assert_eq!(j["table"], "Person");
        let snaps = j["snapshots"].as_array().unwrap();
        assert_eq!(snaps.len(), 2);
        assert_eq!(snaps[0]["id"], 200); // newest-first
        assert_eq!(snaps[0]["rows"], 3);
        assert_eq!(snaps[1]["id"], 100);
        assert_eq!(j["columns"].as_array().unwrap().len(), 2);
    }

    #[test]
    fn paging_windows_the_rows() {
        // 5 rows, page_size 2 → 3 pages; each page shows its slice.
        let snap = Snapshot::new(
            SnapshotMeta::new(1, 10, 5),
            vec![batch(&[1, 2, 3, 4, 5], &["a", "b", "c", "d", "e"])],
        );
        let mut v = IcebergView::new("t", "T", vec![snap]).with_page_size(2);
        assert_eq!(v.page_count(), 3);
        assert_eq!(v.grid().rows.len(), 2);
        assert_eq!(v.grid().rows[0][1], "a");
        v.set_page(1);
        assert_eq!(v.grid().rows.len(), 2);
        assert_eq!(v.grid().rows[0][1], "c");
        v.set_page(2);
        assert_eq!(v.grid().rows.len(), 1); // last page has the remainder
        assert_eq!(v.grid().rows[0][1], "e");
        // Clamp beyond the last page.
        v.set_page(99);
        assert_eq!(v.state_json()["page"], 2);
    }

    #[test]
    fn paging_windows_across_multiple_batches() {
        // Two batches (3 + 3 rows), page_size 4 → page 0 spans the batch boundary.
        let snap = Snapshot::new(
            SnapshotMeta::new(7, 70, 6),
            vec![batch(&[1, 2, 3], &["a", "b", "c"]), batch(&[4, 5, 6], &["d", "e", "f"])],
        );
        let mut v = IcebergView::new("t", "T", vec![snap]).with_page_size(4);
        assert_eq!(v.grid().rows.len(), 4);
        assert_eq!(v.grid().rows[3][1], "d", "page 0 crosses into the second batch");
        v.set_page(1);
        assert_eq!(v.grid().rows.len(), 2);
        assert_eq!(v.grid().rows[0][1], "e");
    }

    #[test]
    fn headless_render_draws_and_reports_state() {
        // Inject 2 snapshots, render the newest offscreen, time-travel, render
        // again — assert it both DREW and that state reflects the switch.
        let mut v = two_snapshot_view();
        let r2 = harness::headless_render(&mut v);
        assert_eq!(r2.title, "🧊 Tables");
        assert!(r2.drew(), "a snapshot of rows should tessellate to vertices");
        assert_eq!(r2.state["current"], 200);
        assert_eq!(r2.state["visible_rows"], 3);

        v.select_snapshot(100);
        let r1 = harness::headless_render(&mut v);
        assert_eq!(r1.state["current"], 100);
        assert_eq!(r1.state["visible_rows"], 2);
        assert_ne!(r1.state["visible_rows"], r2.state["visible_rows"]);
    }

    #[test]
    fn empty_view_renders_hint_without_panic() {
        let mut v = IcebergView::new("t", "Empty", Vec::new());
        assert_eq!(v.current_id(), None);
        assert_eq!(v.total_rows(), 0);
        assert_eq!(v.page_count(), 0);
        let r = harness::headless_render(&mut v);
        assert_eq!(r.state["snapshots"].as_array().unwrap().len(), 0);
        assert_eq!(r.state["current"], serde_json::Value::Null);
    }

    #[test]
    fn caps_advertise_themeable_selectable_resizable() {
        let c = two_snapshot_view().caps();
        assert!(c.themeable && c.selectable && c.resizable);
        assert!(!c.scalable && !c.copyable);
    }
}