Skip to main content

oxiui_table/
async_source.rs

1//! Async data source support for OxiUI table.
2//!
3//! [`AsyncRowSource`] is a `Send + Sync` trait analogous to [`RowSource`] but
4//! with an `async fn row()` signature.  This enables remote / IO-bound data
5//! sources (databases, REST APIs, file parsing) without blocking the UI thread.
6//!
7//! A built-in [`PrefetchBuffer`] wraps any `AsyncRowSource` and prefetches
8//! rows near the current viewport, serving cached rows synchronously once
9//! fetched.
10
11use std::{
12    collections::{HashMap, VecDeque},
13    future::Future,
14    pin::Pin,
15    sync::{Arc, Mutex},
16};
17
18use crate::{Cell, ColumnDef, RowSource, TableError, DEFAULT_ROW_HEIGHT};
19
20/// A boxed, send-able future.
21pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
22
23// ── AsyncRowSource ────────────────────────────────────────────────────────────
24
25/// Asynchronous variant of [`RowSource`] for IO-bound data backends.
26///
27/// Implement this trait for data sources that cannot materialise rows
28/// synchronously (e.g. remote databases, networked APIs, lazy disk reads).
29///
30/// # Usage
31///
32/// Wrap an `AsyncRowSource` in a [`PrefetchBuffer`] to obtain a synchronous
33/// [`RowSource`] that serves rows from cache while background prefetch
34/// keeps the buffer warm.
35///
36/// # Thread safety
37///
38/// Implementors are only required to be `Send`.  The [`PrefetchBuffer`]
39/// wraps the source in an `Arc` and accesses it exclusively from the async
40/// prefetch path, which runs on a single task at a time.
41pub trait AsyncRowSource: Send {
42    /// Total number of rows available in the source.
43    ///
44    /// This count is used to size scroll bars and paginate navigation.
45    fn row_count(&self) -> usize;
46
47    /// Return the column definitions for this source.
48    fn column_defs(&self) -> &[ColumnDef];
49
50    /// Asynchronously fetch the cells for the row at `index`.
51    ///
52    /// Implementations must be cancel-safe.  Large in-flight requests may be
53    /// cancelled when the viewport moves to a different range.
54    fn row_async(&self, index: usize) -> BoxFuture<'_, Result<Vec<Cell>, TableError>>;
55
56    /// Per-row height in logical pixels.
57    ///
58    /// Defaults to [`DEFAULT_ROW_HEIGHT`] (24 px) for every row.
59    fn row_height(&self, _index: usize) -> f32 {
60        DEFAULT_ROW_HEIGHT
61    }
62
63    /// Optional footer row (aggregate / summary).
64    ///
65    /// Fetched once and cached; invalidate by rebuilding the `PrefetchBuffer`.
66    fn footer_async(&self) -> BoxFuture<'_, Option<Vec<Cell>>> {
67        Box::pin(async { None })
68    }
69}
70
71// ── PrefetchBufferInner ───────────────────────────────────────────────────────
72
73/// Internal mutable state of a [`PrefetchBuffer`].
74#[derive(Default)]
75struct PrefetchBufferInner {
76    /// Cached rows keyed by row index.
77    cache: HashMap<usize, Vec<Cell>>,
78    /// Maximum number of cached rows (LRU eviction).
79    max_rows: usize,
80    /// Eviction queue tracking LRU order (oldest at front).
81    lru: VecDeque<usize>,
82    /// Pending prefetch request indices.
83    pending: Vec<usize>,
84    /// Cached footer row, if any.
85    footer: Option<Vec<Cell>>,
86}
87
88impl PrefetchBufferInner {
89    fn new(max_rows: usize) -> Self {
90        PrefetchBufferInner {
91            max_rows,
92            ..Default::default()
93        }
94    }
95
96    /// Insert a row into the cache, evicting the LRU entry if at capacity.
97    fn insert(&mut self, index: usize, cells: Vec<Cell>) {
98        if self.cache.contains_key(&index) {
99            // Move to front of LRU by removing and re-inserting.
100            self.lru.retain(|&i| i != index);
101        } else if self.cache.len() >= self.max_rows {
102            // Evict the least-recently-used row.
103            if let Some(evict) = self.lru.pop_front() {
104                self.cache.remove(&evict);
105            }
106        }
107        self.cache.insert(index, cells);
108        self.lru.push_back(index);
109    }
110
111    /// Get a cached row, promoting it in LRU order.
112    fn get(&mut self, index: usize) -> Option<&Vec<Cell>> {
113        if self.cache.contains_key(&index) {
114            // Promote to MRU.
115            self.lru.retain(|&i| i != index);
116            self.lru.push_back(index);
117            self.cache.get(&index)
118        } else {
119            None
120        }
121    }
122
123    /// Mark `indices` as pending prefetch (deduplicates).
124    fn enqueue_prefetch(&mut self, indices: impl IntoIterator<Item = usize>) {
125        for i in indices {
126            if !self.cache.contains_key(&i) && !self.pending.contains(&i) {
127                self.pending.push(i);
128            }
129        }
130    }
131
132    /// Drain the list of pending prefetch indices.
133    fn drain_pending(&mut self) -> Vec<usize> {
134        std::mem::take(&mut self.pending)
135    }
136
137    /// Number of cached rows.
138    fn len(&self) -> usize {
139        self.cache.len()
140    }
141
142    /// True if the given index is cached.
143    fn is_cached(&self, index: usize) -> bool {
144        self.cache.contains_key(&index)
145    }
146
147    /// Invalidate the entire cache (e.g. on source mutation).
148    fn invalidate(&mut self) {
149        self.cache.clear();
150        self.lru.clear();
151        self.pending.clear();
152        self.footer = None;
153    }
154}
155
156// ── PrefetchBuffer ────────────────────────────────────────────────────────────
157
158/// Wraps an [`AsyncRowSource`] and provides a synchronous [`RowSource`] view.
159///
160/// The buffer caches fetched rows in an LRU cache (capacity `max_rows`).
161/// On a cache miss it returns a placeholder row of [`Cell::Empty`] values and
162/// enqueues the row index for background prefetch.  Callers are expected to
163/// drive the prefetch loop by calling [`flush_pending`](PrefetchBuffer::flush_pending)
164/// in an async context (e.g. a Tokio task or a `wasm_bindgen_futures::spawn_local`
165/// closure).
166///
167/// # Placeholder row
168///
169/// A cache miss returns `N` [`Cell::Empty`] cells where `N` is
170/// `column_defs().len()`.  Renderers may style such cells as loading
171/// placeholders (spinner, shimmer, etc.).
172///
173/// # Thread safety
174///
175/// The inner state is wrapped in `Arc<Mutex<_>>` so `PrefetchBuffer` is
176/// `Send + Sync` and can be shared across threads or with async runtimes.
177pub struct PrefetchBuffer<S: AsyncRowSource> {
178    source: Arc<S>,
179    inner: Arc<Mutex<PrefetchBufferInner>>,
180    prefetch_ahead: usize,
181}
182
183// Manual Clone impl: Arc handles the S without requiring S: Clone.
184impl<S: AsyncRowSource> Clone for PrefetchBuffer<S> {
185    fn clone(&self) -> Self {
186        PrefetchBuffer {
187            source: self.source.clone(),
188            inner: self.inner.clone(),
189            prefetch_ahead: self.prefetch_ahead,
190        }
191    }
192}
193
194impl<S: AsyncRowSource> PrefetchBuffer<S> {
195    /// Create a new buffer wrapping `source`.
196    ///
197    /// - `max_rows`: maximum number of rows to hold in the LRU cache.
198    /// - `prefetch_ahead`: number of rows ahead of the viewport to enqueue for
199    ///   prefetch when [`request_prefetch`](PrefetchBuffer::request_prefetch) is
200    ///   called.
201    pub fn new(source: S, max_rows: usize, prefetch_ahead: usize) -> Self {
202        PrefetchBuffer {
203            source: Arc::new(source),
204            inner: Arc::new(Mutex::new(PrefetchBufferInner::new(max_rows))),
205            prefetch_ahead,
206        }
207    }
208
209    /// Request that rows `[start, start + viewport_rows + prefetch_ahead)` be
210    /// prefetched.
211    ///
212    /// Non-cached rows are enqueued internally; call
213    /// [`flush_pending`](PrefetchBuffer::flush_pending) in an async context to
214    /// actually perform the fetches.
215    pub fn request_prefetch(&self, start: usize, viewport_rows: usize) {
216        let end = (start + viewport_rows + self.prefetch_ahead).min(self.source.row_count());
217        if let Ok(mut inner) = self.inner.lock() {
218            inner.enqueue_prefetch(start..end);
219        }
220    }
221
222    /// Drive the prefetch loop, fetching all pending rows and storing them in
223    /// the cache.
224    ///
225    /// Should be called from an async context (e.g. a spawned task).  Returns
226    /// the number of rows successfully fetched in this call.
227    pub async fn flush_pending(&self) -> usize {
228        let pending = self
229            .inner
230            .lock()
231            .map(|mut g| g.drain_pending())
232            .unwrap_or_default();
233
234        let mut fetched = 0usize;
235        for idx in pending {
236            match self.source.row_async(idx).await {
237                Ok(cells) => {
238                    if let Ok(mut inner) = self.inner.lock() {
239                        inner.insert(idx, cells);
240                        fetched += 1;
241                    }
242                }
243                Err(_) => {
244                    // Silently skip failed fetches; they will be retried on
245                    // the next `request_prefetch` + `flush_pending` cycle.
246                }
247            }
248        }
249        fetched
250    }
251
252    /// Store a single already-fetched row directly into the cache.
253    ///
254    /// Useful for callers that manage their own prefetch executor.
255    pub fn store_row(&self, index: usize, cells: Vec<Cell>) {
256        if let Ok(mut inner) = self.inner.lock() {
257            inner.insert(index, cells);
258        }
259    }
260
261    /// Invalidate the entire cache (e.g. after a source mutation).
262    ///
263    /// The next call to [`RowSource::row`] will trigger fresh prefetch requests
264    /// for every accessed row.
265    pub fn invalidate(&self) {
266        if let Ok(mut inner) = self.inner.lock() {
267            inner.invalidate();
268        }
269    }
270
271    /// Number of rows currently held in the cache.
272    pub fn cached_count(&self) -> usize {
273        self.inner.lock().map(|g| g.len()).unwrap_or(0)
274    }
275
276    /// True if the given row index is in the cache.
277    pub fn is_cached(&self, index: usize) -> bool {
278        self.inner
279            .lock()
280            .map(|g| g.is_cached(index))
281            .unwrap_or(false)
282    }
283
284    /// Access the underlying async source.
285    pub fn source(&self) -> &S {
286        &self.source
287    }
288}
289
290impl<S: AsyncRowSource> RowSource for PrefetchBuffer<S> {
291    fn row_count(&self) -> usize {
292        self.source.row_count()
293    }
294
295    fn column_defs(&self) -> &[ColumnDef] {
296        self.source.column_defs()
297    }
298
299    fn row(&self, index: usize) -> Vec<Cell> {
300        if let Ok(mut inner) = self.inner.lock() {
301            if let Some(row) = inner.get(index) {
302                return row.clone();
303            }
304            // Cache miss — enqueue for prefetch and return placeholders.
305            inner.enqueue_prefetch(std::iter::once(index));
306        }
307        // Return an empty-cell placeholder row.
308        let ncols = self.source.column_defs().len();
309        vec![Cell::Empty; ncols.max(1)]
310    }
311
312    fn row_height(&self, index: usize) -> f32 {
313        self.source.row_height(index)
314    }
315
316    fn footer(&self) -> Option<Vec<Cell>> {
317        self.inner.lock().ok()?.footer.clone()
318    }
319}
320
321// ── Tests ─────────────────────────────────────────────────────────────────────
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use crate::ColumnDef;
327
328    // A simple synchronous source for testing (simulates an instant async source).
329    struct InMemoryAsync {
330        rows: Vec<Vec<Cell>>,
331        cols: Vec<ColumnDef>,
332    }
333
334    impl InMemoryAsync {
335        fn new(n: usize) -> Self {
336            use crate::ColumnDefBuilder;
337            let cols = vec![
338                ColumnDefBuilder::new("id").width(60.0).build(),
339                ColumnDefBuilder::new("value").width(120.0).build(),
340            ];
341            let rows = (0..n)
342                .map(|i| vec![Cell::Int(i as i64), Cell::Text(format!("row-{i}"))])
343                .collect();
344            InMemoryAsync { rows, cols }
345        }
346    }
347
348    impl AsyncRowSource for InMemoryAsync {
349        fn row_count(&self) -> usize {
350            self.rows.len()
351        }
352
353        fn column_defs(&self) -> &[ColumnDef] {
354            &self.cols
355        }
356
357        fn row_async(&self, index: usize) -> BoxFuture<'_, Result<Vec<Cell>, TableError>> {
358            let result = if index < self.rows.len() {
359                Ok(self.rows[index].clone())
360            } else {
361                Err(TableError::OutOfBounds { row: index, col: 0 })
362            };
363            Box::pin(async move { result })
364        }
365    }
366
367    // Use pollster (Pure Rust) as the test executor — no unsafe, no extra dep
368    // beyond what the workspace already carries.
369    use pollster::block_on;
370
371    #[test]
372    fn async_source_row_count() {
373        let src = InMemoryAsync::new(100);
374        assert_eq!(src.row_count(), 100);
375    }
376
377    #[test]
378    fn async_source_row_async_returns_correct_cells() {
379        let src = InMemoryAsync::new(5);
380        let row = block_on(src.row_async(2)).expect("row ok");
381        assert!(matches!(row[0], Cell::Int(2)));
382        assert!(matches!(&row[1], Cell::Text(s) if s == "row-2"));
383    }
384
385    #[test]
386    fn async_source_out_of_bounds() {
387        let src = InMemoryAsync::new(3);
388        let err = block_on(src.row_async(10)).expect_err("should be err");
389        assert!(matches!(err, TableError::OutOfBounds { row: 10, .. }));
390    }
391
392    #[test]
393    fn prefetch_buffer_cache_miss_returns_placeholder() {
394        let buf = PrefetchBuffer::new(InMemoryAsync::new(50), 32, 4);
395        // Row is not in cache yet; should return placeholders.
396        let row = buf.row(0);
397        // 2 columns → 2 empty cells.
398        assert_eq!(row.len(), 2);
399        for cell in &row {
400            assert!(matches!(cell, Cell::Empty));
401        }
402        assert!(!buf.is_cached(0));
403    }
404
405    #[test]
406    fn prefetch_buffer_store_and_retrieve_row() {
407        let buf = PrefetchBuffer::new(InMemoryAsync::new(50), 32, 4);
408        buf.store_row(5, vec![Cell::Int(5), Cell::Text("row-5".to_string())]);
409        assert!(buf.is_cached(5));
410        let row = buf.row(5);
411        assert!(matches!(row[0], Cell::Int(5)));
412    }
413
414    #[test]
415    fn prefetch_buffer_flush_pending_fetches_rows() {
416        let buf = PrefetchBuffer::new(InMemoryAsync::new(20), 32, 0);
417        // Access row 3 — triggers a prefetch request.
418        let _ = buf.row(3);
419        // Now flush the pending requests.
420        let fetched = block_on(buf.flush_pending());
421        assert_eq!(fetched, 1);
422        assert!(buf.is_cached(3));
423        // Next access is a cache hit.
424        let row = buf.row(3);
425        assert!(matches!(row[0], Cell::Int(3)));
426    }
427
428    #[test]
429    fn prefetch_buffer_request_prefetch_enqueues_range() {
430        let buf = PrefetchBuffer::new(InMemoryAsync::new(100), 64, 5);
431        // Request viewport rows 0..10, plus 5 ahead = rows 0..15.
432        buf.request_prefetch(0, 10);
433        let fetched = block_on(buf.flush_pending());
434        // Should have fetched at most 15 rows (capped by row_count).
435        assert_eq!(fetched, 15);
436        for i in 0..15 {
437            assert!(buf.is_cached(i), "row {i} should be cached");
438        }
439    }
440
441    #[test]
442    fn prefetch_buffer_lru_eviction() {
443        // Cache holds at most 3 rows.
444        let buf = PrefetchBuffer::new(InMemoryAsync::new(10), 3, 0);
445        // Load rows 0, 1, 2 — fills cache.
446        for i in 0..3_usize {
447            buf.store_row(i, vec![Cell::Int(i as i64), Cell::Bool(false)]);
448        }
449        assert_eq!(buf.cached_count(), 3);
450        // Inserting row 3 evicts row 0 (LRU).
451        buf.store_row(3, vec![Cell::Int(3), Cell::Bool(false)]);
452        assert_eq!(buf.cached_count(), 3);
453        assert!(!buf.is_cached(0), "row 0 should be evicted");
454        assert!(buf.is_cached(3), "row 3 should be cached");
455    }
456
457    #[test]
458    fn prefetch_buffer_invalidate_clears_cache() {
459        let buf = PrefetchBuffer::new(InMemoryAsync::new(10), 32, 0);
460        buf.store_row(0, vec![Cell::Int(0), Cell::Bool(false)]);
461        assert!(buf.is_cached(0));
462        buf.invalidate();
463        assert!(!buf.is_cached(0));
464        assert_eq!(buf.cached_count(), 0);
465    }
466
467    #[test]
468    fn prefetch_buffer_implements_row_source() {
469        // Compile-time check: PrefetchBuffer<InMemoryAsync> : RowSource.
470        fn assert_row_source<T: RowSource>(_: &T) {}
471        let buf = PrefetchBuffer::new(InMemoryAsync::new(5), 32, 0);
472        assert_row_source(&buf);
473    }
474
475    #[test]
476    fn prefetch_buffer_row_count_and_column_defs() {
477        let buf = PrefetchBuffer::new(InMemoryAsync::new(42), 32, 0);
478        assert_eq!(buf.row_count(), 42);
479        assert_eq!(buf.column_defs().len(), 2);
480    }
481
482    #[test]
483    fn prefetch_buffer_is_clone() {
484        // Clone shares the same inner state.
485        let buf = PrefetchBuffer::new(InMemoryAsync::new(5), 32, 0);
486        buf.store_row(1, vec![Cell::Int(1), Cell::Bool(false)]);
487        let buf2 = buf.clone();
488        assert!(buf2.is_cached(1));
489    }
490
491    #[test]
492    fn async_source_default_row_height() {
493        let src = InMemoryAsync::new(3);
494        assert_eq!(src.row_height(0), DEFAULT_ROW_HEIGHT);
495    }
496}