use std::{
sync::Arc,
time::{Duration, Instant},
};
use dashmap::DashMap;
use futures::future::join_all;
use roaring::RoaringBitmap;
use uuid::Uuid;
use crate::{
runtime_bridge::bridge_sync_to_async,
supertable::wal::{SealRecord, WalStore},
};
pub const DEFAULT_REFRESH_TTL: Duration = Duration::from_secs(1);
pub const DEFAULT_NEGATIVE_TTL: Duration = Duration::from_secs(60);
#[derive(Debug, thiserror::Error)]
pub enum SidecarCacheError {
#[error("tombstone sidecar refresh failed for {superfile_id}: {message}")]
RefreshFailed { superfile_id: Uuid, message: String },
}
#[derive(Debug)]
pub struct SidecarCache {
inner: DashMap<Uuid, CachedSidecar>,
refresh_ttl: Duration,
negative_ttl: Duration,
wal_store: WalStore,
}
#[derive(Debug, Clone)]
struct CachedSidecar {
#[allow(dead_code)]
etag: Option<String>,
bitmap: Arc<RoaringBitmap>,
seal: Option<SealRecord>,
last_checked: Instant,
}
impl SidecarCache {
pub fn new(wal_store: WalStore, refresh_ttl: Duration) -> Self {
Self {
inner: DashMap::new(),
refresh_ttl,
negative_ttl: DEFAULT_NEGATIVE_TTL,
wal_store,
}
}
fn ttl_for_empty(&self, is_empty: bool) -> Duration {
if is_empty {
self.negative_ttl
} else {
self.refresh_ttl
}
}
fn needs_refresh(&self, superfile_id: Uuid, now: Instant) -> bool {
match self.inner.get(&superfile_id) {
Some(entry) => {
now.duration_since(entry.last_checked)
>= self.ttl_for_empty(entry.bitmap.is_empty())
}
None => true,
}
}
pub async fn prefetch(&self, superfile_ids: &[Uuid], now: Instant) {
let stale: Vec<Uuid> = superfile_ids
.iter()
.copied()
.filter(|id| self.needs_refresh(*id, now))
.collect();
if stale.is_empty() {
return;
}
let fetches = stale.into_iter().map(|id| {
let wal_store = self.wal_store.clone();
async move { (id, wal_store.get_tombstones(id).await) }
});
let results = join_all(fetches).await;
for (id, result) in results {
let (bitmap, seal, etag) = match result {
Ok(Some((sidecar, etag))) => (Arc::new(sidecar.bitmap), sidecar.seal, Some(etag)),
Ok(None) => (Arc::new(RoaringBitmap::new()), None, None),
Err(_) => continue,
};
self.inner.insert(
id,
CachedSidecar {
etag,
bitmap,
seal,
last_checked: now,
},
);
}
}
fn fetch_sidecar(
&self,
superfile_id: Uuid,
now: Instant,
) -> Result<(Arc<RoaringBitmap>, Option<SealRecord>), SidecarCacheError> {
if !self.needs_refresh(superfile_id, now) {
if let Some(entry) = self.inner.get(&superfile_id) {
return Ok((Arc::clone(&entry.bitmap), entry.seal.clone()));
}
}
self.refresh_and_return_sidecar(superfile_id)
}
pub fn bitmap_for(
&self,
superfile_id: Uuid,
now: Instant,
) -> Result<Arc<RoaringBitmap>, SidecarCacheError> {
self.fetch_sidecar(superfile_id, now)
.map(|(bitmap, _)| bitmap)
}
pub fn seal_for(
&self,
superfile_id: Uuid,
now: Instant,
) -> Result<Option<SealRecord>, SidecarCacheError> {
self.fetch_sidecar(superfile_id, now).map(|(_, seal)| seal)
}
pub fn sidecar_for(
&self,
superfile_id: Uuid,
now: Instant,
) -> Result<(Arc<RoaringBitmap>, Option<SealRecord>), SidecarCacheError> {
self.fetch_sidecar(superfile_id, now)
}
pub fn invalidate(&self, superfile_id: Uuid) {
self.inner.remove(&superfile_id);
}
#[cfg(test)]
pub fn clear(&self) {
self.inner.clear();
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
fn refresh_and_return_sidecar(
&self,
superfile_id: Uuid,
) -> Result<(Arc<RoaringBitmap>, Option<SealRecord>), SidecarCacheError> {
let wal_store = self.wal_store.clone();
let result =
bridge_sync_to_async(async move { wal_store.get_tombstones(superfile_id).await });
let (bitmap, seal, etag) = match result {
Ok(Some((sidecar, etag))) => (Arc::new(sidecar.bitmap), sidecar.seal, Some(etag)),
Ok(None) => (Arc::new(RoaringBitmap::new()), None, None),
Err(e) => {
return Err(SidecarCacheError::RefreshFailed {
superfile_id,
message: format!("{e}"),
});
}
};
let entry = CachedSidecar {
etag,
bitmap: Arc::clone(&bitmap),
seal: seal.clone(),
last_checked: Instant::now(),
};
self.inner.insert(superfile_id, entry);
Ok((bitmap, seal))
}
}
#[cfg(test)]
mod tests {
use std::iter::once;
use tempfile::TempDir;
use super::*;
use crate::{
storage::{LocalFsStorageProvider, StorageProvider},
supertable::wal::tombstones_codec::TombstonesSidecar,
};
fn fixture() -> (TempDir, WalStore, SidecarCache) {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let ws = WalStore::new(Arc::clone(&storage));
let cache = SidecarCache::new(ws.clone(), DEFAULT_REFRESH_TTL);
(dir, ws, cache)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn first_lookup_against_absent_sidecar_returns_empty_bitmap() {
let (_dir, _ws, cache) = fixture();
let now = Instant::now();
let bitmap = cache
.bitmap_for(Uuid::from_u128(0xAB), now)
.expect("lookup");
assert!(bitmap.is_empty());
assert_eq!(cache.len(), 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn lookup_reflects_persisted_sidecar() {
let (_dir, ws, cache) = fixture();
let sf_id = Uuid::from_u128(0xCAFE);
let mut bitmap = RoaringBitmap::new();
bitmap.insert(1);
bitmap.insert(3);
bitmap.insert(5);
let sidecar = TombstonesSidecar { seal: None, bitmap };
ws.put_tombstones(sf_id, None, &sidecar).await.expect("put");
let cached = cache.bitmap_for(sf_id, Instant::now()).expect("lookup");
let collected: Vec<u32> = cached.iter().collect();
assert_eq!(collected, vec![1u32, 3, 5]);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn second_lookup_within_ttl_skips_refresh() {
let (_dir, ws, cache) = fixture();
let sf_id = Uuid::from_u128(0xDEAD);
let now = Instant::now();
let _ = cache.bitmap_for(sf_id, now).expect("warm");
let mut bitmap = RoaringBitmap::new();
bitmap.insert(42);
ws.put_tombstones(sf_id, None, &TombstonesSidecar { seal: None, bitmap })
.await
.expect("put");
let cached = cache.bitmap_for(sf_id, now).expect("warm read");
assert!(cached.is_empty(), "cache must hold the pre-write view");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn invalidate_forces_next_lookup_to_refresh() {
let (_dir, ws, cache) = fixture();
let sf_id = Uuid::from_u128(0xBEEF);
let now = Instant::now();
let _ = cache.bitmap_for(sf_id, now).expect("warm");
let mut bitmap = RoaringBitmap::new();
bitmap.insert(7);
ws.put_tombstones(sf_id, None, &TombstonesSidecar { seal: None, bitmap })
.await
.expect("put");
cache.invalidate(sf_id);
let cached = cache.bitmap_for(sf_id, now).expect("re-read");
let collected: Vec<u32> = cached.iter().collect();
assert_eq!(collected, vec![7u32]);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn prefetch_populates_all_ids_in_one_batch() {
let (_dir, ws, cache) = fixture();
let present = Uuid::from_u128(0x01);
let mut bitmap = RoaringBitmap::new();
bitmap.insert(9);
ws.put_tombstones(present, None, &TombstonesSidecar { seal: None, bitmap })
.await
.expect("put");
let ids: Vec<Uuid> = once(present)
.chain((2..32u128).map(Uuid::from_u128))
.collect();
let now = Instant::now();
cache.prefetch(&ids, now).await;
assert_eq!(cache.len(), ids.len());
for &id in &ids {
assert!(!cache.needs_refresh(id, now), "id {id} should be fresh");
}
assert_eq!(
cache
.bitmap_for(present, now)
.expect("present")
.iter()
.collect::<Vec<_>>(),
vec![9u32]
);
assert!(cache.bitmap_for(ids[1], now).expect("absent").is_empty());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn seal_for_returns_cached_seal() {
let (_dir, ws, cache) = fixture();
let sf_id = Uuid::from_u128(0xFFFF);
let mut bitmap = RoaringBitmap::new();
bitmap.insert(42);
ws.put_tombstones(sf_id, None, &TombstonesSidecar { seal: None, bitmap })
.await
.expect("put");
let now = Instant::now();
let seal = cache.seal_for(sf_id, now).expect("lookup");
assert!(seal.is_none(), "initially unsealed");
let seal_2 = cache.seal_for(sf_id, now).expect("cached");
assert!(seal_2.is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sidecar_for_returns_both_bitmap_and_seal() {
let (_dir, ws, cache) = fixture();
let sf_id = Uuid::from_u128(0xABCD);
let mut bitmap = RoaringBitmap::new();
bitmap.insert(1);
bitmap.insert(2);
ws.put_tombstones(sf_id, None, &TombstonesSidecar { seal: None, bitmap })
.await
.expect("put");
let now = Instant::now();
let (cached_bitmap, seal) = cache.sidecar_for(sf_id, now).expect("lookup");
let collected: Vec<u32> = cached_bitmap.iter().collect();
assert_eq!(collected, vec![1u32, 2]);
assert!(seal.is_none());
}
#[test]
fn empty_view_uses_long_negative_ttl() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let ws = WalStore::new(storage);
let cache = SidecarCache::new(ws, Duration::from_millis(1));
assert_eq!(cache.ttl_for_empty(true), DEFAULT_NEGATIVE_TTL);
assert_eq!(cache.ttl_for_empty(false), Duration::from_millis(1));
}
#[test]
fn cache_is_empty_on_construction() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let ws = WalStore::new(storage);
let cache = SidecarCache::new(ws, DEFAULT_REFRESH_TTL);
assert!(cache.is_empty());
assert_eq!(cache.len(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn clear_empties_the_cache() {
let (_dir, _ws, cache) = fixture();
let _ = cache
.bitmap_for(Uuid::from_u128(0x1234), Instant::now())
.expect("lookup");
assert_eq!(cache.len(), 1);
cache.clear();
assert!(cache.is_empty(), "clear drops all entries");
assert_eq!(cache.len(), 0);
}
}