1use arrow_array::RecordBatch;
27use egui::Ui;
28use facett_core::{Facet, FacetCaps, theme};
29use facett_table::Table;
30
31#[derive(Clone, Debug, PartialEq, Eq)]
36pub struct SnapshotMeta {
37 pub id: i64,
39 pub timestamp_ms: i64,
41 pub rows: u64,
43}
44
45impl SnapshotMeta {
46 pub fn new(id: i64, timestamp_ms: i64, rows: u64) -> Self {
47 Self { id, timestamp_ms, rows }
48 }
49}
50
51pub struct Snapshot {
55 pub meta: SnapshotMeta,
56 pub batches: Vec<RecordBatch>,
57}
58
59impl Snapshot {
60 pub fn new(meta: SnapshotMeta, batches: Vec<RecordBatch>) -> Self {
61 Self { meta, batches }
62 }
63 pub fn row_count(&self) -> usize {
65 self.batches.iter().map(|b| b.num_rows()).sum()
66 }
67 fn columns(&self) -> Vec<String> {
69 self.batches
70 .first()
71 .map(|b| b.schema().fields().iter().map(|f| f.name().clone()).collect())
72 .unwrap_or_default()
73 }
74}
75
76pub const DEFAULT_PAGE_SIZE: usize = 500;
80
81pub struct IcebergView {
83 title: String,
84 table_name: String,
86 snapshots: Vec<Snapshot>,
89 current: usize,
91 page: usize,
93 page_size: usize,
95 grid: Table,
97}
98
99impl IcebergView {
100 pub fn new(title: impl Into<String>, table_name: impl Into<String>, mut snapshots: Vec<Snapshot>) -> Self {
104 snapshots.sort_by(|a, b| {
107 b.meta.timestamp_ms.cmp(&a.meta.timestamp_ms).then(b.meta.id.cmp(&a.meta.id))
108 });
109 let table_name = table_name.into();
110 let mut me = Self {
111 title: title.into(),
112 table_name,
113 snapshots,
114 current: 0,
115 page: 0,
116 page_size: DEFAULT_PAGE_SIZE,
117 grid: Table::new("snapshot", Vec::new()),
118 };
119 me.rebuild_grid();
120 me
121 }
122
123 pub fn with_page_size(mut self, n: usize) -> Self {
125 self.page_size = n.max(1);
126 self.page = 0;
127 self.rebuild_grid();
128 self
129 }
130
131 pub fn table_name(&self) -> &str {
133 &self.table_name
134 }
135
136 pub fn snapshot_metas(&self) -> Vec<SnapshotMeta> {
138 self.snapshots.iter().map(|s| s.meta.clone()).collect()
139 }
140
141 pub fn current_snapshot(&self) -> Option<&SnapshotMeta> {
143 self.snapshots.get(self.current).map(|s| &s.meta)
144 }
145
146 pub fn current_id(&self) -> Option<i64> {
148 self.current_snapshot().map(|m| m.id)
149 }
150
151 pub fn total_rows(&self) -> usize {
153 self.snapshots.get(self.current).map(|s| s.row_count()).unwrap_or(0)
154 }
155
156 pub fn page_count(&self) -> usize {
158 if self.snapshots.is_empty() {
159 0
160 } else {
161 (self.total_rows().max(1) + self.page_size - 1) / self.page_size
162 }
163 }
164
165 pub fn select_snapshot(&mut self, id: i64) -> bool {
169 match self.snapshots.iter().position(|s| s.meta.id == id) {
170 Some(i) => {
171 self.current = i;
172 self.page = 0;
173 self.rebuild_grid();
174 true
175 }
176 None => false,
177 }
178 }
179
180 pub fn select_index(&mut self, i: usize) {
182 if i < self.snapshots.len() {
183 self.current = i;
184 self.page = 0;
185 self.rebuild_grid();
186 }
187 }
188
189 pub fn set_page(&mut self, p: usize) {
191 let last = self.page_count().saturating_sub(1);
192 self.page = p.min(last);
193 self.rebuild_grid();
194 }
195
196 pub fn grid(&self) -> &Table {
198 &self.grid
199 }
200
201 fn rebuild_grid(&mut self) {
205 let Some(snap) = self.snapshots.get(self.current) else {
206 self.grid = Table::new("snapshot", Vec::new());
207 return;
208 };
209 let columns = snap.columns();
210 let mut grid = Table::new(format!("{} @ {}", self.table_name, snap.meta.id), columns);
211
212 let start = self.page * self.page_size;
213 let end = (start + self.page_size).min(snap.row_count());
214
215 let mut row0 = 0usize; for batch in &snap.batches {
218 let n = batch.num_rows();
219 let b_lo = start.saturating_sub(row0).min(n);
220 let b_hi = end.saturating_sub(row0).min(n);
221 if b_lo < b_hi {
222 let window = batch.slice(b_lo, b_hi - b_lo);
225 let t = facett_arrow::table_from_batch(&window, "");
226 for r in t.rows {
227 grid.push_row(r);
228 }
229 }
230 row0 += n;
231 if row0 >= end {
232 break;
233 }
234 }
235 self.grid = grid;
236 }
237}
238
239impl Facet for IcebergView {
240 fn title(&self) -> &str {
241 &self.title
242 }
243
244 fn ui(&mut self, ui: &mut Ui) {
245 let th = theme(ui);
246 if self.snapshots.is_empty() {
247 ui.weak(format!("{} — no snapshots", self.table_name));
248 return;
249 }
250
251 let mut pick: Option<usize> = None;
253 ui.horizontal(|ui| {
254 ui.strong("⏱ snapshot:");
255 egui::ComboBox::from_id_salt(("icebergview_snap", &self.title))
256 .selected_text(
257 self.current_snapshot()
258 .map(|m| format!("#{} · {} rows", m.id, m.rows))
259 .unwrap_or_else(|| "—".into()),
260 )
261 .show_ui(ui, |ui| {
262 for (i, s) in self.snapshots.iter().enumerate() {
263 let m = &s.meta;
264 let label = format!(
265 "{}#{} · t={}ms · {} rows",
266 if i == 0 { "▲ " } else { "" },
267 m.id,
268 m.timestamp_ms,
269 m.rows,
270 );
271 if ui.selectable_label(i == self.current, label).clicked() {
272 pick = Some(i);
273 }
274 }
275 });
276 ui.weak(format!("({} snapshots)", self.snapshots.len()));
277 });
278 if let Some(i) = pick {
279 self.select_index(i);
280 }
281
282 let pages = self.page_count();
284 ui.horizontal(|ui| {
285 ui.label(format!("table {}", self.table_name));
286 ui.separator();
287 if ui.add_enabled(self.page > 0, egui::Button::new("◀ prev")).clicked() {
288 self.set_page(self.page.saturating_sub(1));
289 }
290 ui.colored_label(th.text, format!("page {}/{}", self.page + 1, pages.max(1)));
291 if ui.add_enabled(self.page + 1 < pages, egui::Button::new("next ▶")).clicked() {
292 self.set_page(self.page + 1);
293 }
294 ui.separator();
295 ui.weak(format!("{} rows total", self.total_rows()));
296 });
297
298 ui.separator();
299 self.grid.ui(ui);
301 }
302
303 fn state_json(&self) -> serde_json::Value {
304 serde_json::json!({
305 "table": self.table_name,
306 "snapshots": self.snapshots.iter().map(|s| serde_json::json!({
307 "id": s.meta.id,
308 "timestamp_ms": s.meta.timestamp_ms,
309 "rows": s.meta.rows,
310 })).collect::<Vec<_>>(),
311 "current": self.current_id(),
312 "total_rows": self.total_rows(),
313 "page": self.page,
314 "page_size": self.page_size,
315 "page_count": self.page_count(),
316 "columns": self.grid.columns,
317 "visible_rows": self.grid.rows.len(),
318 })
319 }
320
321 fn selection_json(&self) -> serde_json::Value {
322 match self.current_id() {
323 Some(id) => serde_json::json!(id),
324 None => serde_json::Value::Null,
325 }
326 }
327
328 fn caps(&self) -> FacetCaps {
331 FacetCaps::NONE.themeable().selectable().resizable()
332 }
333
334 fn as_any_mut(&mut self) -> Option<&mut dyn std::any::Any> {
338 Some(self)
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use std::sync::Arc;
346
347 use arrow_array::{Int64Array, StringArray};
348 use arrow_schema::{DataType, Field, Schema};
349 use facett_core::harness;
350
351 fn batch(ids: &[i64], names: &[&str]) -> RecordBatch {
353 RecordBatch::try_new(
354 Arc::new(Schema::new(vec![
355 Field::new("id", DataType::Int64, false),
356 Field::new("name", DataType::Utf8, false),
357 ])),
358 vec![
359 Arc::new(Int64Array::from(ids.to_vec())),
360 Arc::new(StringArray::from(names.to_vec())),
361 ],
362 )
363 .unwrap()
364 }
365
366 fn two_snapshot_view() -> IcebergView {
368 let v1 = Snapshot::new(SnapshotMeta::new(100, 1_000, 2), vec![batch(&[1, 2], &["knut", "korp"])]);
369 let v2 = Snapshot::new(
370 SnapshotMeta::new(200, 2_000, 3),
371 vec![batch(&[1, 2, 3], &["knut", "korp", "skade"])],
372 );
373 IcebergView::new("🧊 Tables", "Person", vec![v1, v2])
375 }
376
377 #[test]
378 fn newest_snapshot_selected_first_and_listed_newest_first() {
379 let v = two_snapshot_view();
380 let metas = v.snapshot_metas();
382 assert_eq!(metas[0].id, 200);
383 assert_eq!(metas[1].id, 100);
384 assert_eq!(v.current_id(), Some(200));
386 assert_eq!(v.total_rows(), 3);
387 }
388
389 #[test]
390 fn switching_snapshots_changes_rendered_rows_and_state() {
391 let mut v = two_snapshot_view();
392
393 assert_eq!(v.grid().rows.len(), 3);
395 assert_eq!(v.grid().rows[2], vec!["3".to_string(), "skade".to_string()]);
396 let s2 = v.state_json();
397 assert_eq!(s2["current"], 200);
398 assert_eq!(s2["total_rows"], 3);
399 assert_eq!(s2["visible_rows"], 3);
400
401 assert!(v.select_snapshot(100), "100 is a known snapshot");
403 assert_eq!(v.current_id(), Some(100));
404 assert_eq!(v.grid().rows.len(), 2);
405 assert!(v.grid().rows.iter().all(|r| r[1] != "skade"), "v1 predates skade");
406 let s1 = v.state_json();
407 assert_eq!(s1["current"], 100);
408 assert_eq!(s1["total_rows"], 2);
409 assert_eq!(s1["visible_rows"], 2);
410
411 assert_ne!(s1["current"], s2["current"]);
413 assert_ne!(s1["visible_rows"], s2["visible_rows"]);
414
415 assert!(!v.select_snapshot(999));
417 assert_eq!(v.current_id(), Some(100));
418 }
419
420 #[test]
421 fn state_json_carries_full_snapshot_list() {
422 let v = two_snapshot_view();
423 let j = v.state_json();
424 assert_eq!(j["table"], "Person");
425 let snaps = j["snapshots"].as_array().unwrap();
426 assert_eq!(snaps.len(), 2);
427 assert_eq!(snaps[0]["id"], 200); assert_eq!(snaps[0]["rows"], 3);
429 assert_eq!(snaps[1]["id"], 100);
430 assert_eq!(j["columns"].as_array().unwrap().len(), 2);
431 }
432
433 #[test]
434 fn paging_windows_the_rows() {
435 let snap = Snapshot::new(
437 SnapshotMeta::new(1, 10, 5),
438 vec![batch(&[1, 2, 3, 4, 5], &["a", "b", "c", "d", "e"])],
439 );
440 let mut v = IcebergView::new("t", "T", vec![snap]).with_page_size(2);
441 assert_eq!(v.page_count(), 3);
442 assert_eq!(v.grid().rows.len(), 2);
443 assert_eq!(v.grid().rows[0][1], "a");
444 v.set_page(1);
445 assert_eq!(v.grid().rows.len(), 2);
446 assert_eq!(v.grid().rows[0][1], "c");
447 v.set_page(2);
448 assert_eq!(v.grid().rows.len(), 1); assert_eq!(v.grid().rows[0][1], "e");
450 v.set_page(99);
452 assert_eq!(v.state_json()["page"], 2);
453 }
454
455 #[test]
456 fn paging_windows_across_multiple_batches() {
457 let snap = Snapshot::new(
459 SnapshotMeta::new(7, 70, 6),
460 vec![batch(&[1, 2, 3], &["a", "b", "c"]), batch(&[4, 5, 6], &["d", "e", "f"])],
461 );
462 let mut v = IcebergView::new("t", "T", vec![snap]).with_page_size(4);
463 assert_eq!(v.grid().rows.len(), 4);
464 assert_eq!(v.grid().rows[3][1], "d", "page 0 crosses into the second batch");
465 v.set_page(1);
466 assert_eq!(v.grid().rows.len(), 2);
467 assert_eq!(v.grid().rows[0][1], "e");
468 }
469
470 #[test]
471 fn headless_render_draws_and_reports_state() {
472 let mut v = two_snapshot_view();
475 let r2 = harness::headless_render(&mut v);
476 assert_eq!(r2.title, "🧊 Tables");
477 assert!(r2.drew(), "a snapshot of rows should tessellate to vertices");
478 assert_eq!(r2.state["current"], 200);
479 assert_eq!(r2.state["visible_rows"], 3);
480
481 v.select_snapshot(100);
482 let r1 = harness::headless_render(&mut v);
483 assert_eq!(r1.state["current"], 100);
484 assert_eq!(r1.state["visible_rows"], 2);
485 assert_ne!(r1.state["visible_rows"], r2.state["visible_rows"]);
486 }
487
488 #[test]
489 fn empty_view_renders_hint_without_panic() {
490 let mut v = IcebergView::new("t", "Empty", Vec::new());
491 assert_eq!(v.current_id(), None);
492 assert_eq!(v.total_rows(), 0);
493 assert_eq!(v.page_count(), 0);
494 let r = harness::headless_render(&mut v);
495 assert_eq!(r.state["snapshots"].as_array().unwrap().len(), 0);
496 assert_eq!(r.state["current"], serde_json::Value::Null);
497 }
498
499 #[test]
500 fn caps_advertise_themeable_selectable_resizable() {
501 let c = two_snapshot_view().caps();
502 assert!(c.themeable && c.selectable && c.resizable);
503 assert!(!c.scalable && !c.copyable);
504 }
505}