use std::{
collections::{HashMap, VecDeque},
future::Future,
pin::Pin,
sync::{Arc, Mutex},
};
use crate::{Cell, ColumnDef, RowSource, TableError, DEFAULT_ROW_HEIGHT};
pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
pub trait AsyncRowSource: Send {
fn row_count(&self) -> usize;
fn column_defs(&self) -> &[ColumnDef];
fn row_async(&self, index: usize) -> BoxFuture<'_, Result<Vec<Cell>, TableError>>;
fn row_height(&self, _index: usize) -> f32 {
DEFAULT_ROW_HEIGHT
}
fn footer_async(&self) -> BoxFuture<'_, Option<Vec<Cell>>> {
Box::pin(async { None })
}
}
#[derive(Default)]
struct PrefetchBufferInner {
cache: HashMap<usize, Vec<Cell>>,
max_rows: usize,
lru: VecDeque<usize>,
pending: Vec<usize>,
footer: Option<Vec<Cell>>,
}
impl PrefetchBufferInner {
fn new(max_rows: usize) -> Self {
PrefetchBufferInner {
max_rows,
..Default::default()
}
}
fn insert(&mut self, index: usize, cells: Vec<Cell>) {
if self.cache.contains_key(&index) {
self.lru.retain(|&i| i != index);
} else if self.cache.len() >= self.max_rows {
if let Some(evict) = self.lru.pop_front() {
self.cache.remove(&evict);
}
}
self.cache.insert(index, cells);
self.lru.push_back(index);
}
fn get(&mut self, index: usize) -> Option<&Vec<Cell>> {
if self.cache.contains_key(&index) {
self.lru.retain(|&i| i != index);
self.lru.push_back(index);
self.cache.get(&index)
} else {
None
}
}
fn enqueue_prefetch(&mut self, indices: impl IntoIterator<Item = usize>) {
for i in indices {
if !self.cache.contains_key(&i) && !self.pending.contains(&i) {
self.pending.push(i);
}
}
}
fn drain_pending(&mut self) -> Vec<usize> {
std::mem::take(&mut self.pending)
}
fn len(&self) -> usize {
self.cache.len()
}
fn is_cached(&self, index: usize) -> bool {
self.cache.contains_key(&index)
}
fn invalidate(&mut self) {
self.cache.clear();
self.lru.clear();
self.pending.clear();
self.footer = None;
}
}
pub struct PrefetchBuffer<S: AsyncRowSource> {
source: Arc<S>,
inner: Arc<Mutex<PrefetchBufferInner>>,
prefetch_ahead: usize,
}
impl<S: AsyncRowSource> Clone for PrefetchBuffer<S> {
fn clone(&self) -> Self {
PrefetchBuffer {
source: self.source.clone(),
inner: self.inner.clone(),
prefetch_ahead: self.prefetch_ahead,
}
}
}
impl<S: AsyncRowSource> PrefetchBuffer<S> {
pub fn new(source: S, max_rows: usize, prefetch_ahead: usize) -> Self {
PrefetchBuffer {
source: Arc::new(source),
inner: Arc::new(Mutex::new(PrefetchBufferInner::new(max_rows))),
prefetch_ahead,
}
}
pub fn request_prefetch(&self, start: usize, viewport_rows: usize) {
let end = (start + viewport_rows + self.prefetch_ahead).min(self.source.row_count());
if let Ok(mut inner) = self.inner.lock() {
inner.enqueue_prefetch(start..end);
}
}
pub async fn flush_pending(&self) -> usize {
let pending = self
.inner
.lock()
.map(|mut g| g.drain_pending())
.unwrap_or_default();
let mut fetched = 0usize;
for idx in pending {
match self.source.row_async(idx).await {
Ok(cells) => {
if let Ok(mut inner) = self.inner.lock() {
inner.insert(idx, cells);
fetched += 1;
}
}
Err(_) => {
}
}
}
fetched
}
pub fn store_row(&self, index: usize, cells: Vec<Cell>) {
if let Ok(mut inner) = self.inner.lock() {
inner.insert(index, cells);
}
}
pub fn invalidate(&self) {
if let Ok(mut inner) = self.inner.lock() {
inner.invalidate();
}
}
pub fn cached_count(&self) -> usize {
self.inner.lock().map(|g| g.len()).unwrap_or(0)
}
pub fn is_cached(&self, index: usize) -> bool {
self.inner
.lock()
.map(|g| g.is_cached(index))
.unwrap_or(false)
}
pub fn source(&self) -> &S {
&self.source
}
}
impl<S: AsyncRowSource> RowSource for PrefetchBuffer<S> {
fn row_count(&self) -> usize {
self.source.row_count()
}
fn column_defs(&self) -> &[ColumnDef] {
self.source.column_defs()
}
fn row(&self, index: usize) -> Vec<Cell> {
if let Ok(mut inner) = self.inner.lock() {
if let Some(row) = inner.get(index) {
return row.clone();
}
inner.enqueue_prefetch(std::iter::once(index));
}
let ncols = self.source.column_defs().len();
vec![Cell::Empty; ncols.max(1)]
}
fn row_height(&self, index: usize) -> f32 {
self.source.row_height(index)
}
fn footer(&self) -> Option<Vec<Cell>> {
self.inner.lock().ok()?.footer.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ColumnDef;
struct InMemoryAsync {
rows: Vec<Vec<Cell>>,
cols: Vec<ColumnDef>,
}
impl InMemoryAsync {
fn new(n: usize) -> Self {
use crate::ColumnDefBuilder;
let cols = vec![
ColumnDefBuilder::new("id").width(60.0).build(),
ColumnDefBuilder::new("value").width(120.0).build(),
];
let rows = (0..n)
.map(|i| vec![Cell::Int(i as i64), Cell::Text(format!("row-{i}"))])
.collect();
InMemoryAsync { rows, cols }
}
}
impl AsyncRowSource for InMemoryAsync {
fn row_count(&self) -> usize {
self.rows.len()
}
fn column_defs(&self) -> &[ColumnDef] {
&self.cols
}
fn row_async(&self, index: usize) -> BoxFuture<'_, Result<Vec<Cell>, TableError>> {
let result = if index < self.rows.len() {
Ok(self.rows[index].clone())
} else {
Err(TableError::OutOfBounds { row: index, col: 0 })
};
Box::pin(async move { result })
}
}
use pollster::block_on;
#[test]
fn async_source_row_count() {
let src = InMemoryAsync::new(100);
assert_eq!(src.row_count(), 100);
}
#[test]
fn async_source_row_async_returns_correct_cells() {
let src = InMemoryAsync::new(5);
let row = block_on(src.row_async(2)).expect("row ok");
assert!(matches!(row[0], Cell::Int(2)));
assert!(matches!(&row[1], Cell::Text(s) if s == "row-2"));
}
#[test]
fn async_source_out_of_bounds() {
let src = InMemoryAsync::new(3);
let err = block_on(src.row_async(10)).expect_err("should be err");
assert!(matches!(err, TableError::OutOfBounds { row: 10, .. }));
}
#[test]
fn prefetch_buffer_cache_miss_returns_placeholder() {
let buf = PrefetchBuffer::new(InMemoryAsync::new(50), 32, 4);
let row = buf.row(0);
assert_eq!(row.len(), 2);
for cell in &row {
assert!(matches!(cell, Cell::Empty));
}
assert!(!buf.is_cached(0));
}
#[test]
fn prefetch_buffer_store_and_retrieve_row() {
let buf = PrefetchBuffer::new(InMemoryAsync::new(50), 32, 4);
buf.store_row(5, vec![Cell::Int(5), Cell::Text("row-5".to_string())]);
assert!(buf.is_cached(5));
let row = buf.row(5);
assert!(matches!(row[0], Cell::Int(5)));
}
#[test]
fn prefetch_buffer_flush_pending_fetches_rows() {
let buf = PrefetchBuffer::new(InMemoryAsync::new(20), 32, 0);
let _ = buf.row(3);
let fetched = block_on(buf.flush_pending());
assert_eq!(fetched, 1);
assert!(buf.is_cached(3));
let row = buf.row(3);
assert!(matches!(row[0], Cell::Int(3)));
}
#[test]
fn prefetch_buffer_request_prefetch_enqueues_range() {
let buf = PrefetchBuffer::new(InMemoryAsync::new(100), 64, 5);
buf.request_prefetch(0, 10);
let fetched = block_on(buf.flush_pending());
assert_eq!(fetched, 15);
for i in 0..15 {
assert!(buf.is_cached(i), "row {i} should be cached");
}
}
#[test]
fn prefetch_buffer_lru_eviction() {
let buf = PrefetchBuffer::new(InMemoryAsync::new(10), 3, 0);
for i in 0..3_usize {
buf.store_row(i, vec![Cell::Int(i as i64), Cell::Bool(false)]);
}
assert_eq!(buf.cached_count(), 3);
buf.store_row(3, vec![Cell::Int(3), Cell::Bool(false)]);
assert_eq!(buf.cached_count(), 3);
assert!(!buf.is_cached(0), "row 0 should be evicted");
assert!(buf.is_cached(3), "row 3 should be cached");
}
#[test]
fn prefetch_buffer_invalidate_clears_cache() {
let buf = PrefetchBuffer::new(InMemoryAsync::new(10), 32, 0);
buf.store_row(0, vec![Cell::Int(0), Cell::Bool(false)]);
assert!(buf.is_cached(0));
buf.invalidate();
assert!(!buf.is_cached(0));
assert_eq!(buf.cached_count(), 0);
}
#[test]
fn prefetch_buffer_implements_row_source() {
fn assert_row_source<T: RowSource>(_: &T) {}
let buf = PrefetchBuffer::new(InMemoryAsync::new(5), 32, 0);
assert_row_source(&buf);
}
#[test]
fn prefetch_buffer_row_count_and_column_defs() {
let buf = PrefetchBuffer::new(InMemoryAsync::new(42), 32, 0);
assert_eq!(buf.row_count(), 42);
assert_eq!(buf.column_defs().len(), 2);
}
#[test]
fn prefetch_buffer_is_clone() {
let buf = PrefetchBuffer::new(InMemoryAsync::new(5), 32, 0);
buf.store_row(1, vec![Cell::Int(1), Cell::Bool(false)]);
let buf2 = buf.clone();
assert!(buf2.is_cached(1));
}
#[test]
fn async_source_default_row_height() {
let src = InMemoryAsync::new(3);
assert_eq!(src.row_height(0), DEFAULT_ROW_HEIGHT);
}
}