use std::{
collections::HashSet,
env, fmt, fs, io,
io::SeekFrom,
os::unix::fs::FileExt,
path::{Path, PathBuf},
sync::{
Arc, OnceLock, Weak,
atomic::{AtomicU64, Ordering},
},
thread,
time::{Duration, Instant},
};
use bytes::Bytes;
use dashmap::{DashMap, mapref::entry::Entry};
use futures::{
future::try_join_all,
stream::{FuturesUnordered, StreamExt},
};
use memmap2::{Mmap, UncheckedAdvice};
use thiserror::Error;
use tokio::{
io::{AsyncSeekExt, AsyncWriteExt},
sync::{OnceCell, Semaphore, oneshot},
task::{JoinHandle, spawn_blocking},
};
use super::config::{ColdFetchMode, DiskCacheConfig, EvictionCandidate};
use crate::{
storage::{StorageError, StorageProvider},
superfile::{
LazyByteSource, PrefetchedSource,
reader::{OpenOptions, SuperfileReader},
},
supertable::{
StorageRangeSource,
manifest::{SubsectionOffsets, SuperfileUri},
},
};
const PARQUET_TAIL_SPEC_BYTES: u64 = 64 * 1024;
const VECTOR_OPEN_HEADER_FALLBACK_BYTES: u64 = 32;
const FTS_OPEN_HEADER_FALLBACK_BYTES: u64 = 48;
const MMAP_PROMOTION_POLL_INTERVAL: Duration = Duration::from_millis(10);
const STORE_UPGRADE_RETRY_INTERVAL: Duration = Duration::from_millis(10);
#[derive(Debug, Error)]
pub enum DiskCacheError {
#[error("storage error during cold fetch")]
Storage(#[from] StorageError),
#[error("local filesystem error: {0}")]
Io(#[from] std::io::Error),
#[error("superfile reader failed to open mmap'd bytes: {0}")]
SuperfileOpen(String),
#[error("superfile reader failed to open bytes")]
SuperfileOpenRead(#[from] crate::superfile::ReadError),
#[error("disk cache budget exceeded with no eligible victims")]
BudgetExceeded,
}
struct CachedEntry {
reader: Arc<SuperfileReader>,
mmap: Option<Arc<Mmap>>,
size_bytes: u64,
last_access_us: AtomicU64,
}
type Coordinator = Arc<OnceCell<Result<Arc<CachedEntry>, DiskCacheError>>>;
#[derive(Debug, Clone, Default)]
pub struct CacheStats {
pub n_entries: u64,
pub current_bytes: u64,
pub budget_bytes: u64,
pub n_cold_fetches: u64,
pub n_evictions: u64,
pub n_madvise_calls: u64,
}
pub struct DiskCacheStore {
storage: Arc<dyn StorageProvider>,
config: DiskCacheConfig,
started_at: Instant,
cached: DashMap<SuperfileUri, Arc<CachedEntry>>,
coordinators: DashMap<SuperfileUri, Coordinator>,
current_bytes: AtomicU64,
n_cold_fetches: AtomicU64,
n_evictions: AtomicU64,
n_madvise_calls: AtomicU64,
n_promotion_waiters: AtomicU64,
pinned_fn: std::sync::Mutex<Arc<dyn Fn() -> HashSet<SuperfileUri> + Send + Sync>>,
prefetch_semaphore: Arc<Semaphore>,
}
impl fmt::Debug for DiskCacheStore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DiskCacheStore")
.field("cache_root", &self.config.cache_root)
.field("budget_bytes", &self.config.disk_budget_bytes)
.field("current_bytes", &self.current_bytes.load(Ordering::Acquire))
.field("n_entries", &self.cached.len())
.field(
"n_cold_fetches",
&self.n_cold_fetches.load(Ordering::Acquire),
)
.finish()
}
}
impl DiskCacheStore {
pub fn new(
storage: Arc<dyn StorageProvider>,
config: DiskCacheConfig,
pinned_fn: Arc<dyn Fn() -> HashSet<SuperfileUri> + Send + Sync>,
) -> Result<Arc<Self>, DiskCacheError> {
fs::create_dir_all(&config.cache_root)?;
let threshold_secs = config.mmap_cold_threshold_secs;
let interval_secs = config.mmap_sweep_interval_secs.max(1);
let prefetch_semaphore = Arc::new(Semaphore::new(config.prefetch_concurrency.max(1)));
let store = Arc::new(Self {
storage,
config,
started_at: Instant::now(),
cached: DashMap::new(),
coordinators: DashMap::new(),
current_bytes: AtomicU64::new(0),
n_cold_fetches: AtomicU64::new(0),
n_evictions: AtomicU64::new(0),
n_madvise_calls: AtomicU64::new(0),
n_promotion_waiters: AtomicU64::new(0),
pinned_fn: std::sync::Mutex::new(pinned_fn),
prefetch_semaphore,
});
if threshold_secs > 0 {
let weak = Arc::downgrade(&store);
let _ = thread::Builder::new()
.name("infino-disk-cache-sweep".into())
.spawn(move || {
loop {
thread::sleep(Duration::from_secs(interval_secs));
match weak.upgrade() {
None => break,
Some(strong) => {
strong.sweep_once();
}
}
}
});
}
Ok(store)
}
pub fn sweep_once(&self) -> u64 {
let threshold_us = self
.config
.mmap_cold_threshold_secs
.saturating_mul(1_000_000);
let now_us = self.now_us();
let snapshot: Vec<(SuperfileUri, Arc<Mmap>, u64)> = self
.cached
.iter()
.filter_map(|e| {
let mmap = e.value().mmap.clone()?;
let last = e.value().last_access_us.load(Ordering::Acquire);
Some((*e.key(), mmap, last))
})
.collect();
let mut n_advised = 0u64;
for (_uri, mmap, last_access) in snapshot {
let idle = now_us.saturating_sub(last_access);
if idle >= threshold_us {
let _ = unsafe { mmap.unchecked_advise(UncheckedAdvice::DontNeed) };
n_advised += 1;
}
}
if n_advised > 0 {
self.n_madvise_calls.fetch_add(n_advised, Ordering::AcqRel);
}
n_advised
}
pub fn new_unpinned(
storage: Arc<dyn StorageProvider>,
config: DiskCacheConfig,
) -> Result<Arc<Self>, DiskCacheError> {
Self::new(storage, config, Arc::new(HashSet::new))
}
pub async fn reader(
self: &Arc<Self>,
uri: &SuperfileUri,
) -> Result<Arc<SuperfileReader>, DiskCacheError> {
self.reader_with_hints(uri, None).await
}
pub async fn reader_with_hints(
self: &Arc<Self>,
uri: &SuperfileUri,
offsets: Option<&SubsectionOffsets>,
) -> Result<Arc<SuperfileReader>, DiskCacheError> {
match self.config.cold_fetch_mode {
ColdFetchMode::HybridWithPrefetch => self.reader_hybrid(uri).await,
ColdFetchMode::RangeOnly => Err(DiskCacheError::SuperfileOpen(
"ColdFetchMode::RangeOnly bypasses the disk cache; \
construct StorageRangeSource + open_lazy directly"
.into(),
)),
ColdFetchMode::LazyForegroundWithBackgroundFill => {
self.reader_lazy_with_bg_fill_hinted(uri, offsets.cloned())
.await
}
}
}
pub async fn open_range_only(
self: &Arc<Self>,
uri: &SuperfileUri,
offsets: Option<&SubsectionOffsets>,
) -> Result<Arc<SuperfileReader>, DiskCacheError> {
let storage_uri = Self::storage_path(uri);
let range_src: Arc<dyn LazyByteSource> = match offsets {
Some(o) if o.total_size > 0 => Arc::new(StorageRangeSource::with_known_size(
Arc::clone(&self.storage),
storage_uri,
o.total_size,
)),
_ => Arc::new(StorageRangeSource::with_unknown_size(
Arc::clone(&self.storage),
storage_uri,
)),
};
let reader =
SuperfileReader::open_lazy_with(range_src, OpenOptions { verify_crc: false }).await?;
Ok(Arc::new(reader))
}
pub async fn reader_synchronous(
self: &Arc<Self>,
uri: &SuperfileUri,
) -> Result<Arc<SuperfileReader>, DiskCacheError> {
if let Some(entry) = self.cached.get(uri) {
entry.last_access_us.store(self.now_us(), Ordering::Release);
return Ok(Arc::clone(&entry.reader));
}
let cell = self
.coordinators
.entry(*uri)
.or_insert_with(|| Arc::new(OnceCell::new()))
.clone();
let result = cell
.get_or_init(|| async { self.cold_fetch(uri).await })
.await;
match result {
Ok(entry) => {
self.coordinators.remove(uri);
Ok(Arc::clone(&entry.reader))
}
Err(_e) => {
self.coordinators.remove(uri);
Err(self
.cold_fetch(uri)
.await
.err()
.unwrap_or(DiskCacheError::SuperfileOpen("cold fetch error".into())))
}
}
}
async fn reader_hybrid(
self: &Arc<Self>,
uri: &SuperfileUri,
) -> Result<Arc<SuperfileReader>, DiskCacheError> {
if let Some(entry) = self.cached.get(uri) {
entry.last_access_us.store(self.now_us(), Ordering::Release);
return Ok(Arc::clone(&entry.reader));
}
let cell = self
.coordinators
.entry(*uri)
.or_insert_with(|| Arc::new(OnceCell::new()))
.clone();
let result = cell
.get_or_init(|| async { self.cold_fetch_hybrid(uri).await })
.await;
match result {
Ok(entry) => Ok(Arc::clone(&entry.reader)),
Err(DiskCacheError::BudgetExceeded) => {
self.coordinators.remove(uri);
Err(DiskCacheError::BudgetExceeded)
}
Err(_) => {
self.coordinators.remove(uri);
self.cold_fetch_hybrid(uri)
.await
.map(|entry| Arc::clone(&entry.reader))
}
}
}
pub fn is_mmap_promoted(&self, uri: &SuperfileUri) -> bool {
self.cached
.get(uri)
.map(|e| e.mmap.is_some())
.unwrap_or(false)
}
pub async fn wait_until_mmap_promoted(
self: &Arc<Self>,
uri: &SuperfileUri,
timeout: Duration,
) -> Result<(), DiskCacheError> {
let _guard = PromotionWaitGuard::new(&self.n_promotion_waiters);
let start = Instant::now();
while start.elapsed() < timeout {
if self.is_mmap_promoted(uri) {
return Ok(());
}
tokio::time::sleep(MMAP_PROMOTION_POLL_INTERVAL).await;
}
Err(DiskCacheError::SuperfileOpen(format!(
"superfile {uri:?} not mmap-promoted within {timeout:?}"
)))
}
pub fn stats(&self) -> CacheStats {
CacheStats {
n_entries: self.cached.len() as u64,
current_bytes: self.current_bytes.load(Ordering::Acquire),
budget_bytes: self.config.disk_budget_bytes,
n_cold_fetches: self.n_cold_fetches.load(Ordering::Acquire),
n_evictions: self.n_evictions.load(Ordering::Acquire),
n_madvise_calls: self.n_madvise_calls.load(Ordering::Acquire),
}
}
pub fn set_pinned_fn(&self, pinned_fn: Arc<dyn Fn() -> HashSet<SuperfileUri> + Send + Sync>) {
let mut g = self.pinned_fn.lock().expect("pinned_fn mutex poisoned");
*g = pinned_fn;
}
pub fn current_mmap_size_bytes(&self) -> u64 {
self.cached
.iter()
.filter_map(|e| e.value().mmap.as_ref().map(|m| m.len() as u64))
.sum()
}
pub fn sweep_for_budget(&self, budget_bytes: u64) -> u64 {
let mut total = self.current_mmap_size_bytes();
if total <= budget_bytes {
return 0;
}
let mut candidates: Vec<(SuperfileUri, Arc<Mmap>, u64, u64)> = self
.cached
.iter()
.filter_map(|e| {
let mmap = e.value().mmap.clone()?;
Some((
*e.key(),
mmap,
e.value().last_access_us.load(Ordering::Acquire),
e.value().size_bytes,
))
})
.collect();
candidates.sort_by_key(|(_, _, last, _)| *last);
let mut n_advised = 0u64;
for (_uri, mmap, _last, size) in candidates {
if total <= budget_bytes {
break;
}
let _ = unsafe { mmap.unchecked_advise(UncheckedAdvice::DontNeed) };
self.n_madvise_calls.fetch_add(1, Ordering::AcqRel);
total = total.saturating_sub(size);
n_advised += 1;
}
n_advised
}
pub fn current_pinned_uris(&self) -> HashSet<SuperfileUri> {
let f = {
let g = self.pinned_fn.lock().expect("pinned_fn mutex poisoned");
Arc::clone(&g)
};
f()
}
pub async fn insert_warm(
self: &Arc<Self>,
uri: &SuperfileUri,
bytes: Bytes,
) -> Result<(), DiskCacheError> {
if self.cached.contains_key(uri) {
return Ok(());
}
let size = bytes.len() as u64;
self.reserve_manual(size).await?;
let result: Result<Arc<CachedEntry>, DiskCacheError> = async {
let tmp = self.tmp_path(uri);
let final_path = self.cache_path(uri);
{
let mut file = tokio::fs::File::create(&tmp).await?;
file.write_all(&bytes).await?;
file.flush().await?;
file.sync_all().await?;
}
tokio::fs::rename(&tmp, &final_path).await?;
let mmap = open_readonly_mmap(&final_path).map_err(DiskCacheError::Io)?;
let mmap_arc = Arc::new(mmap);
let reader_bytes = Bytes::from_owner(ArcMmapOwner(Arc::clone(&mmap_arc)));
let reader = SuperfileReader::open_with(
reader_bytes,
OpenOptions {
verify_crc: self.config.verify_crc_on_open,
},
)?;
let entry = Arc::new(CachedEntry {
reader: Arc::new(reader),
mmap: Some(mmap_arc),
size_bytes: size,
last_access_us: AtomicU64::new(self.now_us()),
});
Ok(entry)
}
.await;
let entry = match result {
Ok(e) => e,
Err(e) => {
self.current_bytes.fetch_sub(size, Ordering::Release);
return Err(e);
}
};
match self.cached.entry(*uri) {
Entry::Vacant(v) => {
v.insert(entry);
}
Entry::Occupied(_) => {
self.current_bytes.fetch_sub(size, Ordering::Release);
let _ = fs::remove_file(self.cache_path(uri));
}
}
Ok(())
}
fn now_us(&self) -> u64 {
self.started_at.elapsed().as_micros() as u64
}
fn cache_path(&self, uri: &SuperfileUri) -> PathBuf {
self.config.cache_root.join(uri.cache_filename())
}
fn tmp_path(&self, uri: &SuperfileUri) -> PathBuf {
self.config.cache_root.join(uri.cache_tmp_filename())
}
fn storage_path(uri: &SuperfileUri) -> String {
uri.storage_path()
}
async fn cold_fetch_hybrid(
self: &Arc<Self>,
uri: &SuperfileUri,
) -> Result<Arc<CachedEntry>, DiskCacheError> {
let storage_uri = Self::storage_path(uri);
let head = self.storage.head(&storage_uri).await?;
let size = head.size;
self.reserve_manual(size).await?;
let reserved_bytes = size;
let tmp = self.tmp_path(uri);
let final_path = self.cache_path(uri);
let n_streams = self.config.cold_fetch_streams.max(1) as u64;
let chunk_size = self
.config
.cold_fetch_chunk_bytes
.max(size.div_ceil(n_streams));
let n_chunks = if size == 0 {
0
} else {
size.div_ceil(chunk_size)
};
let file = tokio::fs::File::create(&tmp).await?;
file.set_len(size).await?;
let file = Arc::new(tokio::sync::Mutex::new(file));
let chunks: Arc<tokio::sync::Mutex<Vec<Option<(u64, Bytes)>>>> =
Arc::new(tokio::sync::Mutex::new(vec![None; n_chunks as usize]));
let mut fetch_handles = Vec::with_capacity(n_chunks as usize);
let mut write_handles = Vec::with_capacity(n_chunks as usize);
for i in 0..n_chunks {
let start = i * chunk_size;
let end = (start + chunk_size).min(size);
let storage = Arc::clone(&self.storage);
let file = Arc::clone(&file);
let chunks = Arc::clone(&chunks);
let uri_s = storage_uri.clone();
let (write_tx, write_rx) = oneshot::channel::<JoinHandle<Result<(), DiskCacheError>>>();
write_handles.push(write_rx);
fetch_handles.push(tokio::spawn(async move {
let bytes = storage.get_range(&uri_s, start..end).await?;
{
let mut guard = chunks.lock().await;
guard[i as usize] = Some((start, bytes.clone()));
}
let pwrite_handle = tokio::spawn(async move {
let mut guard = file.lock().await;
guard.seek(SeekFrom::Start(start)).await?;
guard.write_all(&bytes).await?;
Ok::<(), DiskCacheError>(())
});
let _ = write_tx.send(pwrite_handle);
Ok::<(), DiskCacheError>(())
}));
}
for h in fetch_handles {
h.await
.map_err(|e| DiskCacheError::SuperfileOpen(format!("fetch join: {e}")))??;
}
let buffer = {
let chunks_guard = chunks.lock().await;
let mut buf = vec![0u8; size as usize];
for (start, bytes) in chunks_guard.iter().flatten() {
let s = *start as usize;
let e = s + bytes.len();
buf[s..e].copy_from_slice(bytes);
}
buf
};
let foreground_bytes = Bytes::from(buffer);
let foreground_reader = SuperfileReader::open_with(
foreground_bytes,
OpenOptions {
verify_crc: self.config.verify_crc_on_open,
},
)?;
let foreground_reader = Arc::new(foreground_reader);
let entry = Arc::new(CachedEntry {
reader: Arc::clone(&foreground_reader),
mmap: None, size_bytes: size,
last_access_us: AtomicU64::new(self.now_us()),
});
self.n_cold_fetches.fetch_add(1, Ordering::AcqRel);
self.cached.insert(*uri, Arc::clone(&entry));
let store = Arc::clone(self);
let uri_owned = *uri;
let tmp_owned = tmp.clone();
let final_owned = final_path.clone();
let file_owned = Arc::clone(&file);
tokio::spawn(async move {
let _ = finalize_to_mmap(
store,
uri_owned,
tmp_owned,
final_owned,
file_owned,
write_handles,
size,
reserved_bytes,
)
.await;
});
Ok(entry)
}
async fn reader_lazy_with_bg_fill_hinted(
self: &Arc<Self>,
uri: &SuperfileUri,
offsets: Option<SubsectionOffsets>,
) -> Result<Arc<SuperfileReader>, DiskCacheError> {
if let Some(entry) = self.cached.get(uri) {
entry.last_access_us.store(self.now_us(), Ordering::Release);
return Ok(Arc::clone(&entry.reader));
}
let cell = self
.coordinators
.entry(*uri)
.or_insert_with(|| Arc::new(OnceCell::new()))
.clone();
let result = cell
.get_or_init(|| async { self.cold_fetch_lazy(uri, offsets.as_ref()).await })
.await;
match result {
Ok(entry) => Ok(Arc::clone(&entry.reader)),
Err(_e) => {
self.coordinators.remove(uri);
Err(self
.cold_fetch_lazy(uri, offsets.as_ref())
.await
.err()
.unwrap_or(DiskCacheError::SuperfileOpen(
"lazy cold fetch error".into(),
)))
}
}
}
async fn cold_fetch_lazy(
self: &Arc<Self>,
uri: &SuperfileUri,
offsets: Option<&SubsectionOffsets>,
) -> Result<Arc<CachedEntry>, DiskCacheError> {
let storage_uri = Self::storage_path(uri);
let (lazy_reader, size) = if let Some(offsets) = offsets {
let total_size = offsets.total_size;
let parquet_tail_len = PARQUET_TAIL_SPEC_BYTES.min(total_size);
let parquet_tail_start = total_size.saturating_sub(parquet_tail_len);
let vec_ranges = if !offsets.vec_open_ranges.is_empty() {
offsets.vec_open_ranges.clone()
} else {
match offsets.vec {
Some((off, len)) if len > 0 => {
vec![(off, VECTOR_OPEN_HEADER_FALLBACK_BYTES.min(len))]
}
_ => Vec::new(),
}
};
let fts_ranges = if !offsets.fts_open_ranges.is_empty() {
offsets.fts_open_ranges.clone()
} else {
match offsets.fts {
Some((off, len)) if len > 0 => {
vec![(off, FTS_OPEN_HEADER_FALLBACK_BYTES.min(len))]
}
_ => Vec::new(),
}
};
let inner: Arc<dyn LazyByteSource> = Arc::new(StorageRangeSource::with_known_size(
Arc::clone(&self.storage),
storage_uri.clone(),
total_size,
));
let mut overlay = PrefetchedSource::new(inner);
if !offsets.open_blob.is_empty() {
for (off, bytes) in &offsets.open_blob {
overlay.install(*off, Bytes::copy_from_slice(bytes));
}
} else {
let storage_for_parquet = Arc::clone(&self.storage);
let storage_for_vec = Arc::clone(&self.storage);
let storage_for_fts = Arc::clone(&self.storage);
let parquet_uri = storage_uri.clone();
let vec_uri = storage_uri.clone();
let fts_uri = storage_uri.clone();
let parquet_fut = async move {
let end = total_size;
let start = parquet_tail_start;
if end == start {
return Ok::<_, StorageError>(Bytes::new());
}
storage_for_parquet
.get_range(&parquet_uri, start..end)
.await
};
let vec_fut =
async move { fetch_hint_ranges(storage_for_vec, vec_uri, vec_ranges).await };
let fts_fut =
async move { fetch_hint_ranges(storage_for_fts, fts_uri, fts_ranges).await };
let (parquet_bytes, vec_pre, fts_pre) =
futures::try_join!(parquet_fut, vec_fut, fts_fut)?;
if !parquet_bytes.is_empty() {
overlay.install(parquet_tail_start, parquet_bytes);
}
for (off, bytes) in vec_pre {
overlay.install(off, bytes);
}
for (off, bytes) in fts_pre {
overlay.install(off, bytes);
}
}
let source: Arc<dyn LazyByteSource> = Arc::new(overlay);
let lazy_reader = SuperfileReader::open_lazy_with(
Arc::clone(&source),
OpenOptions { verify_crc: false },
)
.await?;
(lazy_reader, total_size)
} else {
let range_src: Arc<dyn LazyByteSource> =
Arc::new(StorageRangeSource::with_unknown_size(
Arc::clone(&self.storage),
storage_uri.clone(),
));
let lazy_reader = SuperfileReader::open_lazy_with(
Arc::clone(&range_src),
OpenOptions { verify_crc: false },
)
.await?;
let size = range_src.size();
(lazy_reader, size)
};
self.reserve_manual(size).await?;
let reserved_bytes = size;
let lazy_reader = Arc::new(lazy_reader);
let entry = Arc::new(CachedEntry {
reader: Arc::clone(&lazy_reader),
mmap: None,
size_bytes: size,
last_access_us: AtomicU64::new(self.now_us()),
});
self.n_cold_fetches.fetch_add(1, Ordering::AcqRel);
self.cached.insert(*uri, Arc::clone(&entry));
if !skip_background_fill() {
let store = Arc::downgrade(self);
let reader = Arc::downgrade(&lazy_reader);
let uri_owned = *uri;
let storage_uri_owned = storage_uri;
tokio::spawn(async move {
let _ = lazy_background_fill(
store,
reader,
uri_owned,
storage_uri_owned,
size,
reserved_bytes,
)
.await;
});
}
Ok(entry)
}
async fn cold_fetch(&self, uri: &SuperfileUri) -> Result<Arc<CachedEntry>, DiskCacheError> {
let storage_uri = Self::storage_path(uri);
let head = self.storage.head(&storage_uri).await?;
let size = head.size;
let reservation = self.reserve(size).await?;
let tmp = self.tmp_path(uri);
let final_path = self.cache_path(uri);
self.cold_fetch_to_disk(&storage_uri, &tmp, size).await?;
tokio::fs::rename(&tmp, &final_path).await?;
let mmap = open_readonly_mmap(&final_path).map_err(DiskCacheError::Io)?;
let mmap_arc = Arc::new(mmap);
let bytes = Bytes::from_owner(ArcMmapOwner(Arc::clone(&mmap_arc)));
let reader = SuperfileReader::open_with(
bytes,
OpenOptions {
verify_crc: self.config.verify_crc_on_open,
},
)?;
let entry = Arc::new(CachedEntry {
reader: Arc::new(reader),
mmap: Some(mmap_arc),
size_bytes: size,
last_access_us: AtomicU64::new(self.now_us()),
});
self.cached.insert(*uri, Arc::clone(&entry));
self.n_cold_fetches.fetch_add(1, Ordering::AcqRel);
reservation.commit();
Ok(entry)
}
async fn reserve_manual(&self, bytes: u64) -> Result<(), DiskCacheError> {
loop {
let cur = self.current_bytes.load(Ordering::Acquire);
if cur + bytes <= self.config.disk_budget_bytes {
if self
.current_bytes
.compare_exchange_weak(cur, cur + bytes, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return Ok(());
}
continue;
}
let needed = (cur + bytes).saturating_sub(self.config.disk_budget_bytes);
self.evict_at_least(needed).await?;
}
}
async fn reserve(&self, bytes: u64) -> Result<Reservation<'_>, DiskCacheError> {
loop {
let cur = self.current_bytes.load(Ordering::Acquire);
if cur + bytes <= self.config.disk_budget_bytes {
if self
.current_bytes
.compare_exchange_weak(cur, cur + bytes, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return Ok(Reservation {
store: self,
bytes,
committed: false,
});
}
continue;
}
let needed = (cur + bytes).saturating_sub(self.config.disk_budget_bytes);
self.evict_at_least(needed).await?;
}
}
async fn evict_at_least(&self, bytes_needed: u64) -> Result<(), DiskCacheError> {
let pinned_fn = {
let g = self.pinned_fn.lock().expect("pinned_fn mutex poisoned");
Arc::clone(&g)
};
let pinned = pinned_fn();
let candidates: Vec<EvictionCandidate> = self
.cached
.iter()
.map(|e| EvictionCandidate {
uri: *e.key(),
size_bytes: e.value().size_bytes,
last_access_us: e.value().last_access_us.load(Ordering::Acquire),
})
.collect();
let victims = self
.config
.eviction
.select_for_eviction(&candidates, &pinned, bytes_needed);
if victims.is_empty() {
return Err(DiskCacheError::BudgetExceeded);
}
for uri in victims {
if let Some((_, entry)) = self.cached.remove(&uri) {
let path = self.cache_path(&uri);
let _ = fs::remove_file(&path);
self.current_bytes
.fetch_sub(entry.size_bytes, Ordering::Release);
self.n_evictions.fetch_add(1, Ordering::AcqRel);
}
}
Ok(())
}
async fn cold_fetch_to_disk(
&self,
storage_uri: &str,
dest_path: &Path,
size: u64,
) -> Result<(), DiskCacheError> {
let n_streams = self.config.cold_fetch_streams.max(1);
let chunk_size = self.config.cold_fetch_chunk_bytes.max(1);
let file = {
let f = fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(dest_path)?;
f.set_len(size)?;
Arc::new(f)
};
let n_chunks = if size == 0 {
0
} else {
size.div_ceil(chunk_size)
};
let stream_sem = Arc::new(tokio::sync::Semaphore::new(n_streams));
let mut joins = Vec::with_capacity(n_chunks as usize);
for i in 0..n_chunks {
let start = i * chunk_size;
let end = (start + chunk_size).min(size);
let storage = Arc::clone(&self.storage);
let file = Arc::clone(&file);
let uri = storage_uri.to_string();
let stream_sem = Arc::clone(&stream_sem);
joins.push(tokio::spawn(async move {
let _permit = stream_sem.acquire_owned().await.map_err(|e| {
DiskCacheError::SuperfileOpen(format!("stream semaphore closed: {e}"))
})?;
let bytes = storage.get_range(&uri, start..end).await?;
spawn_blocking(move || file.write_all_at(&bytes, start))
.await
.map_err(|e| DiskCacheError::SuperfileOpen(format!("write join: {e}")))??;
Ok::<(), DiskCacheError>(())
}));
}
for h in joins {
h.await
.map_err(|e| DiskCacheError::SuperfileOpen(format!("join error: {e}")))??;
}
spawn_blocking(move || file.sync_all())
.await
.map_err(|e| DiskCacheError::SuperfileOpen(format!("fsync join: {e}")))??;
Ok(())
}
}
struct Reservation<'a> {
store: &'a DiskCacheStore,
bytes: u64,
committed: bool,
}
impl<'a> Reservation<'a> {
fn commit(mut self) {
self.committed = true;
}
}
impl<'a> Drop for Reservation<'a> {
fn drop(&mut self) {
if !self.committed {
self.store
.current_bytes
.fetch_sub(self.bytes, Ordering::Release);
}
}
}
struct PromotionWaitGuard<'a>(&'a AtomicU64);
impl<'a> PromotionWaitGuard<'a> {
fn new(counter: &'a AtomicU64) -> Self {
counter.fetch_add(1, Ordering::AcqRel);
Self(counter)
}
}
impl Drop for PromotionWaitGuard<'_> {
fn drop(&mut self) {
self.0.fetch_sub(1, Ordering::AcqRel);
}
}
async fn finalize_to_mmap(
store: Arc<DiskCacheStore>,
uri: SuperfileUri,
tmp_path: PathBuf,
final_path: PathBuf,
file: Arc<tokio::sync::Mutex<tokio::fs::File>>,
pwrite_handles: Vec<oneshot::Receiver<JoinHandle<Result<(), DiskCacheError>>>>,
size: u64,
reserved_bytes: u64,
) -> Result<(), DiskCacheError> {
let res: Result<(), DiskCacheError> = async {
for recv in pwrite_handles {
let handle = recv
.await
.map_err(|e| DiskCacheError::SuperfileOpen(format!("pwrite handle: {e}")))?;
handle
.await
.map_err(|e| DiskCacheError::SuperfileOpen(format!("pwrite join: {e}")))??;
}
{
let mut guard = file.lock().await;
guard.flush().await?;
guard.sync_all().await?;
}
drop(file);
tokio::fs::rename(&tmp_path, &final_path).await?;
let mmap = open_readonly_mmap(&final_path)?;
let mmap_arc = Arc::new(mmap);
let bytes = Bytes::from_owner(ArcMmapOwner(Arc::clone(&mmap_arc)));
let reader = SuperfileReader::open_with(
bytes,
OpenOptions {
verify_crc: store.config.verify_crc_on_open,
},
)?;
match store.cached.entry(uri) {
Entry::Occupied(mut occ) => {
*occ.get_mut() = Arc::new(CachedEntry {
reader: Arc::new(reader),
mmap: Some(mmap_arc),
size_bytes: size,
last_access_us: AtomicU64::new(store.started_at.elapsed().as_micros() as u64),
});
}
Entry::Vacant(_) => {
let _ = fs::remove_file(&final_path);
}
}
store.coordinators.remove(&uri);
Ok::<(), DiskCacheError>(())
}
.await;
if res.is_err() {
if let Some((_, entry)) = store.cached.remove(&uri) {
store
.current_bytes
.fetch_sub(entry.size_bytes, Ordering::Release);
}
store.coordinators.remove(&uri);
}
let _ = reserved_bytes;
res
}
async fn fetch_hint_ranges(
storage: Arc<dyn StorageProvider>,
storage_uri: String,
ranges: Vec<(u64, u64)>,
) -> Result<Vec<(u64, Bytes)>, StorageError> {
try_join_all(
ranges
.into_iter()
.filter(|&(_, len)| len > 0)
.map(|(off, len)| {
let storage = Arc::clone(&storage);
let storage_uri = storage_uri.clone();
async move {
let bytes = storage.get_range(&storage_uri, off..off + len).await?;
Ok::<_, StorageError>((off, bytes))
}
}),
)
.await
}
fn background_store_abandoned(store: &Arc<DiskCacheStore>) -> bool {
Arc::strong_count(store) == 1
}
async fn wait_for_lazy_foreground_release(
store: &Weak<DiskCacheStore>,
reader: &Weak<SuperfileReader>,
) -> Option<Arc<DiskCacheStore>> {
loop {
if store.strong_count() == 0 || reader.strong_count() == 0 {
return None;
}
if let Some(strong) = store.upgrade()
&& strong.n_promotion_waiters.load(Ordering::Acquire) > 0
{
return Some(strong);
}
if reader.strong_count() <= 1 {
tokio::time::sleep(STORE_UPGRADE_RETRY_INTERVAL).await;
if reader.strong_count() <= 1 {
return store.upgrade();
}
continue;
}
tokio::time::sleep(STORE_UPGRADE_RETRY_INTERVAL).await;
}
}
async fn cold_fetch_to_disk_cancelable(
store: &Arc<DiskCacheStore>,
storage_uri: &str,
dest_path: &Path,
size: u64,
) -> Result<bool, DiskCacheError> {
let n_streams = store.config.cold_fetch_streams.max(1);
let chunk_size = store.config.cold_fetch_chunk_bytes.max(1);
let file = {
let f = fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(dest_path)?;
f.set_len(size)?;
Arc::new(f)
};
let n_chunks = if size == 0 {
0
} else {
size.div_ceil(chunk_size)
};
let mut next_chunk = 0u64;
let mut in_flight = FuturesUnordered::new();
loop {
while next_chunk < n_chunks && in_flight.len() < n_streams {
if background_store_abandoned(store) {
return Ok(false);
}
let start = next_chunk * chunk_size;
let end = (start + chunk_size).min(size);
let storage = Arc::clone(&store.storage);
let file = Arc::clone(&file);
let uri = storage_uri.to_string();
in_flight.push(async move {
let bytes = storage.get_range(&uri, start..end).await?;
spawn_blocking(move || file.write_all_at(&bytes, start))
.await
.map_err(|e| DiskCacheError::SuperfileOpen(format!("write join: {e}")))??;
Ok::<(), DiskCacheError>(())
});
next_chunk += 1;
}
match in_flight.next().await {
Some(res) => res?,
None => break,
}
if background_store_abandoned(store) {
return Ok(false);
}
}
if background_store_abandoned(store) {
return Ok(false);
}
spawn_blocking(move || file.sync_all())
.await
.map_err(|e| DiskCacheError::SuperfileOpen(format!("fsync join: {e}")))??;
Ok(true)
}
fn rollback_lazy_background_fill(store: &Arc<DiskCacheStore>, uri: &SuperfileUri, tmp: &Path) {
if let Some((_, entry)) = store.cached.remove(uri) {
store
.current_bytes
.fetch_sub(entry.size_bytes, Ordering::Release);
}
store.coordinators.remove(uri);
let _ = fs::remove_file(tmp);
}
pub(crate) fn skip_background_fill() -> bool {
static SKIP: OnceLock<bool> = OnceLock::new();
*SKIP.get_or_init(|| {
env::var("INFINO_DISABLE_BG_FILL")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false)
})
}
async fn lazy_background_fill(
store: Weak<DiskCacheStore>,
reader: Weak<SuperfileReader>,
uri: SuperfileUri,
storage_uri: String,
size: u64,
reserved_bytes: u64,
) -> Result<(), DiskCacheError> {
let Some(store) = wait_for_lazy_foreground_release(&store, &reader).await else {
return Ok(());
};
let tmp = store.tmp_path(&uri);
let final_path = store.cache_path(&uri);
if background_store_abandoned(&store) {
rollback_lazy_background_fill(&store, &uri, &tmp);
let _ = reserved_bytes;
return Ok(());
}
let _prefetch_permit = match Arc::clone(&store.prefetch_semaphore).acquire_owned().await {
Ok(p) => p,
Err(e) => {
store.coordinators.remove(&uri);
if let Some((_, entry)) = store.cached.remove(&uri) {
store
.current_bytes
.fetch_sub(entry.size_bytes, Ordering::Release);
}
return Err(DiskCacheError::SuperfileOpen(format!(
"prefetch semaphore closed: {e}"
)));
}
};
let res: Result<(), DiskCacheError> = async {
if background_store_abandoned(&store) {
return Ok(());
}
if !cold_fetch_to_disk_cancelable(&store, &storage_uri, &tmp, size).await? {
return Ok(());
}
if background_store_abandoned(&store) {
return Ok(());
}
tokio::fs::rename(&tmp, &final_path).await?;
let mmap = open_readonly_mmap(&final_path)?;
let mmap_arc = Arc::new(mmap);
let bytes = Bytes::from_owner(ArcMmapOwner(Arc::clone(&mmap_arc)));
let reader = SuperfileReader::open_with(
bytes,
OpenOptions {
verify_crc: store.config.verify_crc_on_open,
},
)?;
match store.cached.entry(uri) {
Entry::Occupied(mut occ) => {
*occ.get_mut() = Arc::new(CachedEntry {
reader: Arc::new(reader),
mmap: Some(mmap_arc),
size_bytes: size,
last_access_us: AtomicU64::new(store.now_us()),
});
}
Entry::Vacant(_) => {
let _ = fs::remove_file(&final_path);
}
}
store.coordinators.remove(&uri);
Ok::<(), DiskCacheError>(())
}
.await;
if res.is_err() || background_store_abandoned(&store) {
rollback_lazy_background_fill(&store, &uri, &tmp);
let _ = fs::remove_file(&tmp);
}
let _ = reserved_bytes; res
}
struct ArcMmapOwner(Arc<Mmap>);
impl AsRef<[u8]> for ArcMmapOwner {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}
fn open_readonly_mmap(path: &Path) -> io::Result<Mmap> {
let file = fs::File::open(path)?;
unsafe { Mmap::map(&file) }
}
#[cfg(test)]
mod tests {
use std::io::Error as IoError;
use arrow_array::{LargeStringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use tempfile::TempDir;
use super::*;
use crate::{
storage::LocalFsStorageProvider,
superfile::{
SuperfileReader,
builder::{BuilderOptions, SuperfileBuilder},
},
test_helpers::{decimal128_id_field, decimal128_ids},
};
const HELD_POLL_TURNS: u32 = 5;
const PROMOTE_TIMEOUT: Duration = Duration::from_secs(10);
fn tiny_superfile_bytes() -> Bytes {
let schema = Arc::new(Schema::new(vec![
decimal128_id_field("doc_id"),
Field::new("title", DataType::LargeUtf8, false),
]));
let opts = BuilderOptions::new(schema.clone(), "doc_id", vec![], vec![], None);
let mut b = SuperfileBuilder::new(opts).expect("builder");
let ids = decimal128_ids(vec![1u64]);
let titles = LargeStringArray::from(vec!["alpha"]);
let batch =
RecordBatch::try_new(schema, vec![Arc::new(ids), Arc::new(titles)]).expect("batch");
b.add_batch(&batch, &[]).expect("add_batch");
Bytes::from(b.finish().expect("finish"))
}
fn tiny_reader() -> Arc<SuperfileReader> {
Arc::new(SuperfileReader::open(tiny_superfile_bytes()).expect("open"))
}
fn test_store() -> (TempDir, Arc<DiskCacheStore>) {
test_store_with(|cfg| {
cfg.mmap_cold_threshold_secs = 0;
})
}
fn test_store_with(
mutate: impl FnOnce(&mut DiskCacheConfig),
) -> (TempDir, Arc<DiskCacheStore>) {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("localfs"));
let mut cfg = DiskCacheConfig {
cache_root: dir.path().join("cache"),
mmap_cold_threshold_secs: 0,
..Default::default()
};
mutate(&mut cfg);
let store = DiskCacheStore::new_unpinned(storage, cfg).expect("store");
(dir, store)
}
async fn put_superfile(store: &Arc<DiskCacheStore>, uri: &SuperfileUri, bytes: Bytes) {
store
.storage
.put_atomic(&uri.storage_path(), bytes)
.await
.expect("put superfile");
}
#[tokio::test(start_paused = true)]
async fn wait_for_release_rechecks_reader_after_grace_sleep() {
let (_dir, store) = test_store();
let reader = tiny_reader();
let weak_store = Arc::downgrade(&store);
let weak_reader = Arc::downgrade(&reader);
let waiter = tokio::spawn(async move {
wait_for_lazy_foreground_release(&weak_store, &weak_reader).await
});
tokio::time::sleep(Duration::from_millis(1)).await;
let held = Arc::clone(&reader);
tokio::time::sleep(STORE_UPGRADE_RETRY_INTERVAL * HELD_POLL_TURNS).await;
assert!(
!waiter.is_finished(),
"background fill must keep waiting while the foreground reader is held"
);
drop(held);
let got = waiter.await.expect("waiter join");
assert!(
got.is_some(),
"wait must yield the store once the foreground hold releases"
);
drop(reader);
}
#[tokio::test]
async fn new_creates_cache_root() {
let (dir, store) = test_store();
assert!(dir.path().join("cache").is_dir(), "cache_root created");
let dbg = format!("{store:?}");
assert!(dbg.contains("DiskCacheStore"));
assert!(dbg.contains("n_cold_fetches"));
}
#[tokio::test]
async fn new_with_sweep_thread_enabled_spawns_and_drops_cleanly() {
let (_dir, store) = test_store_with(|cfg| {
cfg.mmap_cold_threshold_secs = 1;
cfg.mmap_sweep_interval_secs = 0; });
drop(store); }
#[tokio::test]
async fn new_unpinned_installs_empty_pinned_set() {
let (_dir, store) = test_store();
assert!(store.current_pinned_uris().is_empty());
}
#[tokio::test]
async fn stats_reflect_config_and_counters() {
let (_dir, store) = test_store_with(|cfg| {
cfg.disk_budget_bytes = 12345;
});
let s = store.stats();
assert_eq!(s.budget_bytes, 12345);
assert_eq!(s.n_entries, 0);
assert_eq!(s.current_bytes, 0);
assert_eq!(s.n_cold_fetches, 0);
assert_eq!(s.n_evictions, 0);
assert_eq!(s.n_madvise_calls, 0);
let _ = format!("{:?}", s.clone());
assert_eq!(CacheStats::default().n_entries, 0);
}
#[tokio::test]
async fn set_and_read_pinned_fn() {
let (_dir, store) = test_store();
let uri = SuperfileUri::new_v4();
store.set_pinned_fn(Arc::new(move || {
let mut s = HashSet::new();
s.insert(uri);
s
}));
let pinned = store.current_pinned_uris();
assert!(pinned.contains(&uri));
assert_eq!(pinned.len(), 1);
}
#[tokio::test]
async fn is_mmap_promoted_false_for_unknown_uri() {
let (_dir, store) = test_store();
assert!(!store.is_mmap_promoted(&SuperfileUri::new_v4()));
}
#[tokio::test]
async fn insert_warm_caches_and_serves_reader() {
let (_dir, store) = test_store();
let uri = SuperfileUri::new_v4();
let bytes = tiny_superfile_bytes();
let size = bytes.len() as u64;
store.insert_warm(&uri, bytes).await.expect("insert_warm");
assert!(store.is_mmap_promoted(&uri));
let s = store.stats();
assert_eq!(s.n_entries, 1);
assert_eq!(s.current_bytes, size);
assert_eq!(s.n_cold_fetches, 0);
assert_eq!(store.current_mmap_size_bytes(), size);
assert!(store.cache_path(&uri).is_file());
let _r = store.reader(&uri).await.expect("reader");
assert_eq!(store.stats().n_cold_fetches, 0);
}
#[tokio::test]
async fn insert_warm_is_idempotent() {
let (_dir, store) = test_store();
let uri = SuperfileUri::new_v4();
store
.insert_warm(&uri, tiny_superfile_bytes())
.await
.expect("first");
let before = store.stats().current_bytes;
store
.insert_warm(&uri, tiny_superfile_bytes())
.await
.expect("second");
assert_eq!(store.stats().current_bytes, before);
assert_eq!(store.stats().n_entries, 1);
}
#[tokio::test]
async fn insert_warm_rejects_unparseable_bytes() {
let (_dir, store) = test_store();
let uri = SuperfileUri::new_v4();
let err = store
.insert_warm(&uri, Bytes::from_static(b"not a superfile"))
.await
.expect_err("garbage must fail to open");
assert_eq!(store.stats().current_bytes, 0);
assert_eq!(store.stats().n_entries, 0);
let _ = format!("{err}");
let _ = format!("{err:?}");
}
#[tokio::test]
async fn insert_warm_budget_exceeded_when_too_big() {
let (_dir, store) = test_store_with(|cfg| {
cfg.disk_budget_bytes = 4; });
let uri = SuperfileUri::new_v4();
let err = store
.insert_warm(&uri, tiny_superfile_bytes())
.await
.expect_err("must exceed budget");
assert!(matches!(err, DiskCacheError::BudgetExceeded));
assert_eq!(store.stats().current_bytes, 0);
}
#[tokio::test]
async fn reader_synchronous_cold_then_warm_hit() {
let (_dir, store) = test_store();
let uri = SuperfileUri::new_v4();
let bytes = tiny_superfile_bytes();
let size = bytes.len() as u64;
put_superfile(&store, &uri, bytes).await;
let _r = store.reader_synchronous(&uri).await.expect("cold");
let s = store.stats();
assert_eq!(s.n_cold_fetches, 1);
assert_eq!(s.n_entries, 1);
assert_eq!(s.current_bytes, size);
assert!(store.is_mmap_promoted(&uri));
let _r2 = store.reader_synchronous(&uri).await.expect("warm");
assert_eq!(store.stats().n_cold_fetches, 1);
}
#[tokio::test]
async fn reader_synchronous_missing_object_errors() {
let (_dir, store) = test_store();
let uri = SuperfileUri::new_v4();
let err = store.reader_synchronous(&uri).await.expect_err("no object");
let _ = format!("{err}");
assert!(store.coordinators.is_empty());
}
#[tokio::test]
async fn reader_hybrid_cold_then_promotes_to_mmap() {
let (_dir, store) = test_store(); let uri = SuperfileUri::new_v4();
put_superfile(&store, &uri, tiny_superfile_bytes()).await;
let r = store.reader(&uri).await.expect("cold hybrid");
assert_eq!(r.n_docs(), 1);
assert_eq!(store.stats().n_cold_fetches, 1);
assert_eq!(store.stats().n_entries, 1);
store
.wait_until_mmap_promoted(&uri, PROMOTE_TIMEOUT)
.await
.expect("promotion");
assert!(store.is_mmap_promoted(&uri));
let _r2 = store.reader(&uri).await.expect("warm");
assert_eq!(store.stats().n_cold_fetches, 1);
}
#[tokio::test]
async fn reader_hybrid_empty_object_zero_chunks() {
let (_dir, store) = test_store();
let uri = SuperfileUri::new_v4();
put_superfile(&store, &uri, Bytes::new()).await;
let err = store.reader(&uri).await.expect_err("empty not a superfile");
let _ = format!("{err}");
}
#[tokio::test]
async fn reader_range_only_mode_is_rejected() {
let (_dir, store) = test_store_with(|cfg| {
cfg.cold_fetch_mode = ColdFetchMode::RangeOnly;
});
let uri = SuperfileUri::new_v4();
put_superfile(&store, &uri, tiny_superfile_bytes()).await;
let err = store.reader(&uri).await.expect_err("RangeOnly rejected");
assert!(matches!(err, DiskCacheError::SuperfileOpen(_)));
}
#[tokio::test]
async fn open_range_only_unknown_size_reads_directly() {
let (_dir, store) = test_store();
let uri = SuperfileUri::new_v4();
put_superfile(&store, &uri, tiny_superfile_bytes()).await;
let r = store.open_range_only(&uri, None).await.expect("range open");
assert_eq!(r.n_docs(), 1);
assert_eq!(store.stats().n_entries, 0);
assert_eq!(store.stats().current_bytes, 0);
}
#[tokio::test]
async fn open_range_only_known_size_reads_directly() {
let (_dir, store) = test_store();
let uri = SuperfileUri::new_v4();
let bytes = tiny_superfile_bytes();
let total = bytes.len() as u64;
put_superfile(&store, &uri, bytes).await;
let offsets = SubsectionOffsets {
total_size: total,
vec: None,
fts: None,
vec_open_ranges: Vec::new(),
fts_open_ranges: Vec::new(),
open_blob: Vec::new(),
};
let r = store
.open_range_only(&uri, Some(&offsets))
.await
.expect("known-size range open");
assert_eq!(r.n_docs(), 1);
}
#[tokio::test]
async fn reader_lazy_unknown_size_cold_then_promotes() {
let (_dir, store) = test_store_with(|cfg| {
cfg.cold_fetch_mode = ColdFetchMode::LazyForegroundWithBackgroundFill;
});
let uri = SuperfileUri::new_v4();
put_superfile(&store, &uri, tiny_superfile_bytes()).await;
let r = store.reader(&uri).await.expect("lazy cold");
assert_eq!(r.n_docs(), 1);
assert_eq!(store.stats().n_cold_fetches, 1);
drop(r);
store
.wait_until_mmap_promoted(&uri, PROMOTE_TIMEOUT)
.await
.expect("lazy promotion");
assert!(store.is_mmap_promoted(&uri));
}
#[tokio::test]
async fn reader_lazy_with_hints_known_size_promotes() {
let (_dir, store) = test_store_with(|cfg| {
cfg.cold_fetch_mode = ColdFetchMode::LazyForegroundWithBackgroundFill;
});
let uri = SuperfileUri::new_v4();
let bytes = tiny_superfile_bytes();
let total = bytes.len() as u64;
put_superfile(&store, &uri, bytes).await;
let offsets = SubsectionOffsets {
total_size: total,
vec: None,
fts: None,
vec_open_ranges: Vec::new(),
fts_open_ranges: Vec::new(),
open_blob: Vec::new(),
};
let r = store
.reader_with_hints(&uri, Some(&offsets))
.await
.expect("lazy hinted cold");
assert_eq!(r.n_docs(), 1);
assert_eq!(store.stats().n_cold_fetches, 1);
drop(r);
store
.wait_until_mmap_promoted(&uri, PROMOTE_TIMEOUT)
.await
.expect("lazy hinted promotion");
assert!(store.is_mmap_promoted(&uri));
}
#[tokio::test]
async fn wait_until_mmap_promoted_times_out_for_unpromoted() {
let (_dir, store) = test_store();
let uri = SuperfileUri::new_v4();
let err = store
.wait_until_mmap_promoted(&uri, Duration::from_millis(30))
.await
.expect_err("must time out");
assert!(matches!(err, DiskCacheError::SuperfileOpen(_)));
assert_eq!(store.n_promotion_waiters.load(Ordering::Acquire), 0);
}
#[tokio::test]
async fn cold_fetch_evicts_lru_when_over_budget() {
let one = tiny_superfile_bytes();
let entry_size = one.len() as u64;
let (_dir, store) = test_store_with(move |cfg| {
cfg.disk_budget_bytes = entry_size + entry_size / 2;
});
let uri_a = SuperfileUri::new_v4();
let uri_b = SuperfileUri::new_v4();
put_superfile(&store, &uri_a, tiny_superfile_bytes()).await;
put_superfile(&store, &uri_b, tiny_superfile_bytes()).await;
store.reader_synchronous(&uri_a).await.expect("a");
store.reader_synchronous(&uri_b).await.expect("b");
assert_eq!(store.stats().n_evictions, 1);
assert!(store.cached.contains_key(&uri_b));
assert!(!store.cached.contains_key(&uri_a));
assert!(!store.cache_path(&uri_a).exists());
assert_eq!(store.stats().current_bytes, entry_size);
}
#[tokio::test]
async fn cold_fetch_budget_exceeded_with_all_pinned() {
let one = tiny_superfile_bytes();
let entry_size = one.len() as u64;
let (_dir, store) = test_store_with(move |cfg| {
cfg.disk_budget_bytes = entry_size + entry_size / 2;
});
let uri_a = SuperfileUri::new_v4();
let uri_b = SuperfileUri::new_v4();
put_superfile(&store, &uri_a, tiny_superfile_bytes()).await;
put_superfile(&store, &uri_b, tiny_superfile_bytes()).await;
store.reader_synchronous(&uri_a).await.expect("a");
store.set_pinned_fn(Arc::new(move || {
let mut s = HashSet::new();
s.insert(uri_a);
s
}));
let err = store
.reader_synchronous(&uri_b)
.await
.expect_err("no eligible victims");
assert!(matches!(err, DiskCacheError::BudgetExceeded));
assert!(store.cached.contains_key(&uri_a));
}
#[tokio::test]
async fn sweep_once_advises_idle_mmap_entries() {
let (_dir, store) = test_store();
let uri = SuperfileUri::new_v4();
store
.insert_warm(&uri, tiny_superfile_bytes())
.await
.expect("warm");
let advised = store.sweep_once();
assert_eq!(advised, 1);
assert_eq!(store.stats().n_madvise_calls, 1);
assert_eq!(store.sweep_once(), 1);
assert_eq!(store.stats().n_madvise_calls, 2);
}
#[tokio::test]
async fn sweep_once_skips_when_threshold_not_reached() {
let (_dir, store) = test_store_with(|cfg| {
cfg.mmap_cold_threshold_secs = 1_000_000;
});
let uri = SuperfileUri::new_v4();
store
.insert_warm(&uri, tiny_superfile_bytes())
.await
.expect("warm");
assert_eq!(store.sweep_once(), 0);
assert_eq!(store.stats().n_madvise_calls, 0);
}
#[tokio::test]
async fn sweep_for_budget_noop_under_budget() {
let (_dir, store) = test_store();
let uri = SuperfileUri::new_v4();
store
.insert_warm(&uri, tiny_superfile_bytes())
.await
.expect("warm");
assert_eq!(store.sweep_for_budget(u64::MAX), 0);
assert_eq!(store.stats().n_madvise_calls, 0);
}
#[tokio::test]
async fn sweep_for_budget_reclaims_oldest_first() {
let (_dir, store) = test_store();
let uri = SuperfileUri::new_v4();
store
.insert_warm(&uri, tiny_superfile_bytes())
.await
.expect("warm");
let resident = store.current_mmap_size_bytes();
assert!(resident > 0);
let advised = store.sweep_for_budget(0);
assert_eq!(advised, 1);
assert_eq!(store.stats().n_madvise_calls, 1);
}
#[tokio::test]
async fn current_mmap_size_bytes_zero_when_empty() {
let (_dir, store) = test_store();
assert_eq!(store.current_mmap_size_bytes(), 0);
}
#[tokio::test]
async fn disk_cache_error_displays_all_variants() {
let variants = [
DiskCacheError::SuperfileOpen("x".into()),
DiskCacheError::BudgetExceeded,
DiskCacheError::Io(IoError::other("boom")),
];
for v in variants {
assert!(!format!("{v}").is_empty());
assert!(!format!("{v:?}").is_empty());
}
}
}