1use std::{
12 collections::{HashMap, VecDeque},
13 future::Future,
14 pin::Pin,
15 sync::{Arc, Mutex},
16};
17
18use crate::{Cell, ColumnDef, RowSource, TableError, DEFAULT_ROW_HEIGHT};
19
20pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
22
23pub trait AsyncRowSource: Send {
42 fn row_count(&self) -> usize;
46
47 fn column_defs(&self) -> &[ColumnDef];
49
50 fn row_async(&self, index: usize) -> BoxFuture<'_, Result<Vec<Cell>, TableError>>;
55
56 fn row_height(&self, _index: usize) -> f32 {
60 DEFAULT_ROW_HEIGHT
61 }
62
63 fn footer_async(&self) -> BoxFuture<'_, Option<Vec<Cell>>> {
67 Box::pin(async { None })
68 }
69}
70
71#[derive(Default)]
75struct PrefetchBufferInner {
76 cache: HashMap<usize, Vec<Cell>>,
78 max_rows: usize,
80 lru: VecDeque<usize>,
82 pending: Vec<usize>,
84 footer: Option<Vec<Cell>>,
86}
87
88impl PrefetchBufferInner {
89 fn new(max_rows: usize) -> Self {
90 PrefetchBufferInner {
91 max_rows,
92 ..Default::default()
93 }
94 }
95
96 fn insert(&mut self, index: usize, cells: Vec<Cell>) {
98 if self.cache.contains_key(&index) {
99 self.lru.retain(|&i| i != index);
101 } else if self.cache.len() >= self.max_rows {
102 if let Some(evict) = self.lru.pop_front() {
104 self.cache.remove(&evict);
105 }
106 }
107 self.cache.insert(index, cells);
108 self.lru.push_back(index);
109 }
110
111 fn get(&mut self, index: usize) -> Option<&Vec<Cell>> {
113 if self.cache.contains_key(&index) {
114 self.lru.retain(|&i| i != index);
116 self.lru.push_back(index);
117 self.cache.get(&index)
118 } else {
119 None
120 }
121 }
122
123 fn enqueue_prefetch(&mut self, indices: impl IntoIterator<Item = usize>) {
125 for i in indices {
126 if !self.cache.contains_key(&i) && !self.pending.contains(&i) {
127 self.pending.push(i);
128 }
129 }
130 }
131
132 fn drain_pending(&mut self) -> Vec<usize> {
134 std::mem::take(&mut self.pending)
135 }
136
137 fn len(&self) -> usize {
139 self.cache.len()
140 }
141
142 fn is_cached(&self, index: usize) -> bool {
144 self.cache.contains_key(&index)
145 }
146
147 fn invalidate(&mut self) {
149 self.cache.clear();
150 self.lru.clear();
151 self.pending.clear();
152 self.footer = None;
153 }
154}
155
156pub struct PrefetchBuffer<S: AsyncRowSource> {
178 source: Arc<S>,
179 inner: Arc<Mutex<PrefetchBufferInner>>,
180 prefetch_ahead: usize,
181}
182
183impl<S: AsyncRowSource> Clone for PrefetchBuffer<S> {
185 fn clone(&self) -> Self {
186 PrefetchBuffer {
187 source: self.source.clone(),
188 inner: self.inner.clone(),
189 prefetch_ahead: self.prefetch_ahead,
190 }
191 }
192}
193
194impl<S: AsyncRowSource> PrefetchBuffer<S> {
195 pub fn new(source: S, max_rows: usize, prefetch_ahead: usize) -> Self {
202 PrefetchBuffer {
203 source: Arc::new(source),
204 inner: Arc::new(Mutex::new(PrefetchBufferInner::new(max_rows))),
205 prefetch_ahead,
206 }
207 }
208
209 pub fn request_prefetch(&self, start: usize, viewport_rows: usize) {
216 let end = (start + viewport_rows + self.prefetch_ahead).min(self.source.row_count());
217 if let Ok(mut inner) = self.inner.lock() {
218 inner.enqueue_prefetch(start..end);
219 }
220 }
221
222 pub async fn flush_pending(&self) -> usize {
228 let pending = self
229 .inner
230 .lock()
231 .map(|mut g| g.drain_pending())
232 .unwrap_or_default();
233
234 let mut fetched = 0usize;
235 for idx in pending {
236 match self.source.row_async(idx).await {
237 Ok(cells) => {
238 if let Ok(mut inner) = self.inner.lock() {
239 inner.insert(idx, cells);
240 fetched += 1;
241 }
242 }
243 Err(_) => {
244 }
247 }
248 }
249 fetched
250 }
251
252 pub fn store_row(&self, index: usize, cells: Vec<Cell>) {
256 if let Ok(mut inner) = self.inner.lock() {
257 inner.insert(index, cells);
258 }
259 }
260
261 pub fn invalidate(&self) {
266 if let Ok(mut inner) = self.inner.lock() {
267 inner.invalidate();
268 }
269 }
270
271 pub fn cached_count(&self) -> usize {
273 self.inner.lock().map(|g| g.len()).unwrap_or(0)
274 }
275
276 pub fn is_cached(&self, index: usize) -> bool {
278 self.inner
279 .lock()
280 .map(|g| g.is_cached(index))
281 .unwrap_or(false)
282 }
283
284 pub fn source(&self) -> &S {
286 &self.source
287 }
288}
289
290impl<S: AsyncRowSource> RowSource for PrefetchBuffer<S> {
291 fn row_count(&self) -> usize {
292 self.source.row_count()
293 }
294
295 fn column_defs(&self) -> &[ColumnDef] {
296 self.source.column_defs()
297 }
298
299 fn row(&self, index: usize) -> Vec<Cell> {
300 if let Ok(mut inner) = self.inner.lock() {
301 if let Some(row) = inner.get(index) {
302 return row.clone();
303 }
304 inner.enqueue_prefetch(std::iter::once(index));
306 }
307 let ncols = self.source.column_defs().len();
309 vec![Cell::Empty; ncols.max(1)]
310 }
311
312 fn row_height(&self, index: usize) -> f32 {
313 self.source.row_height(index)
314 }
315
316 fn footer(&self) -> Option<Vec<Cell>> {
317 self.inner.lock().ok()?.footer.clone()
318 }
319}
320
321#[cfg(test)]
324mod tests {
325 use super::*;
326 use crate::ColumnDef;
327
328 struct InMemoryAsync {
330 rows: Vec<Vec<Cell>>,
331 cols: Vec<ColumnDef>,
332 }
333
334 impl InMemoryAsync {
335 fn new(n: usize) -> Self {
336 use crate::ColumnDefBuilder;
337 let cols = vec![
338 ColumnDefBuilder::new("id").width(60.0).build(),
339 ColumnDefBuilder::new("value").width(120.0).build(),
340 ];
341 let rows = (0..n)
342 .map(|i| vec![Cell::Int(i as i64), Cell::Text(format!("row-{i}"))])
343 .collect();
344 InMemoryAsync { rows, cols }
345 }
346 }
347
348 impl AsyncRowSource for InMemoryAsync {
349 fn row_count(&self) -> usize {
350 self.rows.len()
351 }
352
353 fn column_defs(&self) -> &[ColumnDef] {
354 &self.cols
355 }
356
357 fn row_async(&self, index: usize) -> BoxFuture<'_, Result<Vec<Cell>, TableError>> {
358 let result = if index < self.rows.len() {
359 Ok(self.rows[index].clone())
360 } else {
361 Err(TableError::OutOfBounds { row: index, col: 0 })
362 };
363 Box::pin(async move { result })
364 }
365 }
366
367 use pollster::block_on;
370
371 #[test]
372 fn async_source_row_count() {
373 let src = InMemoryAsync::new(100);
374 assert_eq!(src.row_count(), 100);
375 }
376
377 #[test]
378 fn async_source_row_async_returns_correct_cells() {
379 let src = InMemoryAsync::new(5);
380 let row = block_on(src.row_async(2)).expect("row ok");
381 assert!(matches!(row[0], Cell::Int(2)));
382 assert!(matches!(&row[1], Cell::Text(s) if s == "row-2"));
383 }
384
385 #[test]
386 fn async_source_out_of_bounds() {
387 let src = InMemoryAsync::new(3);
388 let err = block_on(src.row_async(10)).expect_err("should be err");
389 assert!(matches!(err, TableError::OutOfBounds { row: 10, .. }));
390 }
391
392 #[test]
393 fn prefetch_buffer_cache_miss_returns_placeholder() {
394 let buf = PrefetchBuffer::new(InMemoryAsync::new(50), 32, 4);
395 let row = buf.row(0);
397 assert_eq!(row.len(), 2);
399 for cell in &row {
400 assert!(matches!(cell, Cell::Empty));
401 }
402 assert!(!buf.is_cached(0));
403 }
404
405 #[test]
406 fn prefetch_buffer_store_and_retrieve_row() {
407 let buf = PrefetchBuffer::new(InMemoryAsync::new(50), 32, 4);
408 buf.store_row(5, vec![Cell::Int(5), Cell::Text("row-5".to_string())]);
409 assert!(buf.is_cached(5));
410 let row = buf.row(5);
411 assert!(matches!(row[0], Cell::Int(5)));
412 }
413
414 #[test]
415 fn prefetch_buffer_flush_pending_fetches_rows() {
416 let buf = PrefetchBuffer::new(InMemoryAsync::new(20), 32, 0);
417 let _ = buf.row(3);
419 let fetched = block_on(buf.flush_pending());
421 assert_eq!(fetched, 1);
422 assert!(buf.is_cached(3));
423 let row = buf.row(3);
425 assert!(matches!(row[0], Cell::Int(3)));
426 }
427
428 #[test]
429 fn prefetch_buffer_request_prefetch_enqueues_range() {
430 let buf = PrefetchBuffer::new(InMemoryAsync::new(100), 64, 5);
431 buf.request_prefetch(0, 10);
433 let fetched = block_on(buf.flush_pending());
434 assert_eq!(fetched, 15);
436 for i in 0..15 {
437 assert!(buf.is_cached(i), "row {i} should be cached");
438 }
439 }
440
441 #[test]
442 fn prefetch_buffer_lru_eviction() {
443 let buf = PrefetchBuffer::new(InMemoryAsync::new(10), 3, 0);
445 for i in 0..3_usize {
447 buf.store_row(i, vec![Cell::Int(i as i64), Cell::Bool(false)]);
448 }
449 assert_eq!(buf.cached_count(), 3);
450 buf.store_row(3, vec![Cell::Int(3), Cell::Bool(false)]);
452 assert_eq!(buf.cached_count(), 3);
453 assert!(!buf.is_cached(0), "row 0 should be evicted");
454 assert!(buf.is_cached(3), "row 3 should be cached");
455 }
456
457 #[test]
458 fn prefetch_buffer_invalidate_clears_cache() {
459 let buf = PrefetchBuffer::new(InMemoryAsync::new(10), 32, 0);
460 buf.store_row(0, vec![Cell::Int(0), Cell::Bool(false)]);
461 assert!(buf.is_cached(0));
462 buf.invalidate();
463 assert!(!buf.is_cached(0));
464 assert_eq!(buf.cached_count(), 0);
465 }
466
467 #[test]
468 fn prefetch_buffer_implements_row_source() {
469 fn assert_row_source<T: RowSource>(_: &T) {}
471 let buf = PrefetchBuffer::new(InMemoryAsync::new(5), 32, 0);
472 assert_row_source(&buf);
473 }
474
475 #[test]
476 fn prefetch_buffer_row_count_and_column_defs() {
477 let buf = PrefetchBuffer::new(InMemoryAsync::new(42), 32, 0);
478 assert_eq!(buf.row_count(), 42);
479 assert_eq!(buf.column_defs().len(), 2);
480 }
481
482 #[test]
483 fn prefetch_buffer_is_clone() {
484 let buf = PrefetchBuffer::new(InMemoryAsync::new(5), 32, 0);
486 buf.store_row(1, vec![Cell::Int(1), Cell::Bool(false)]);
487 let buf2 = buf.clone();
488 assert!(buf2.is_cached(1));
489 }
490
491 #[test]
492 fn async_source_default_row_height() {
493 let src = InMemoryAsync::new(3);
494 assert_eq!(src.row_height(0), DEFAULT_ROW_HEIGHT);
495 }
496}