use crate::cached_object_store::stats::CachedObjectStoreStats;
use crate::rand::DbRand;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use log::{debug, warn};
use object_store::path::Path;
use object_store::{Attributes, ObjectMeta};
use rand::{distr::Alphanumeric, Rng};
use slatedb_common::clock::SystemClock;
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::io::{Read, Seek, SeekFrom, Write};
use std::ops::Range;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, OnceCell};
use walkdir::WalkDir;
use crate::cached_object_store::storage::{LocalCacheEntry, LocalCacheHead, LocalCacheStorage};
use crate::utils::format_bytes_si;
#[derive(Debug)]
pub struct FsCacheStorage {
root_folder: std::path::PathBuf,
evictor: Option<Arc<FsCacheEvictor>>,
rand: Arc<DbRand>,
}
impl FsCacheStorage {
pub fn new(
root_folder: std::path::PathBuf,
max_cache_size_bytes: Option<usize>,
scan_interval: Option<Duration>,
stats: Arc<CachedObjectStoreStats>,
system_clock: Arc<dyn SystemClock>,
rand: Arc<DbRand>,
) -> Self {
let evictor = max_cache_size_bytes.map(|max_cache_size_bytes| {
Arc::new(FsCacheEvictor::new(
root_folder.clone(),
max_cache_size_bytes,
scan_interval,
stats,
system_clock,
rand.clone(),
))
});
Self {
root_folder,
evictor,
rand,
}
}
}
#[async_trait::async_trait]
impl LocalCacheStorage for FsCacheStorage {
fn entry(
&self,
location: &object_store::path::Path,
part_size: usize,
) -> Box<dyn LocalCacheEntry> {
Box::new(FsCacheEntry {
root_folder: self.root_folder.clone(),
location: location.clone(),
evictor: self.evictor.clone(),
part_size,
rand: self.rand.clone(),
})
}
async fn start_evictor(&self) {
if let Some(evictor) = &self.evictor {
evictor.start().await
}
}
}
impl Display for FsCacheStorage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "FsCacheStorage({})", self.root_folder.display())
}
}
#[derive(Debug)]
pub(crate) struct FsCacheEntry {
root_folder: std::path::PathBuf,
location: Path,
part_size: usize,
evictor: Option<Arc<FsCacheEvictor>>,
rand: Arc<DbRand>,
}
impl FsCacheEntry {
async fn atomic_write(&self, path: std::path::PathBuf, buf: Bytes) -> object_store::Result<()> {
let tmp_path = path.with_extension(format!("_tmp{}", self.make_rand_suffix()));
if let Some(evictor) = &self.evictor {
if !evictor
.track_entry_accessed(path.clone(), buf.len(), true)
.await
{
return Ok(());
}
}
#[allow(clippy::disallowed_methods)]
tokio::task::spawn_blocking(move || {
let tmp_path = tmp_path.as_path();
if let Some(folder_path) = tmp_path.parent() {
std::fs::create_dir_all(folder_path).map_err(wrap_io_err)?;
}
let mut file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(tmp_path)
.map_err(wrap_io_err)?;
file.write_all(&buf).map_err(wrap_io_err)?;
file.sync_all().map_err(wrap_io_err)?;
std::fs::rename(tmp_path, path).map_err(wrap_io_err)
})
.await?
.map_err(wrap_io_err)?;
Ok(())
}
pub(crate) fn make_part_path(
root_folder: std::path::PathBuf,
location: &Path,
part_number: usize,
part_size: usize,
) -> std::path::PathBuf {
let part_size_name = if part_size.is_multiple_of(1024 * 1024) {
format!("{}mb", part_size / (1024 * 1024))
} else {
format!("{}kb", part_size / 1024)
};
let suffix = format!("_part{}-{:09}", part_size_name, part_number);
let mut path = root_folder.join(location.to_string());
path.push(suffix);
path
}
fn make_head_path(root_folder: std::path::PathBuf, location: &Path) -> std::path::PathBuf {
let suffix = "_head".to_string();
let mut path = root_folder.join(location.to_string());
path.push(suffix);
path
}
fn make_rand_suffix(&self) -> String {
let mut rng = self.rand.rng();
(0..24).map(|_| rng.sample(Alphanumeric) as char).collect()
}
}
#[async_trait::async_trait]
impl LocalCacheEntry for FsCacheEntry {
async fn save_part(&self, part_number: usize, buf: Bytes) -> object_store::Result<()> {
let part_path = Self::make_part_path(
self.root_folder.clone(),
&self.location,
part_number,
self.part_size,
);
self.atomic_write(part_path, buf).await
}
async fn read_part(
&self,
part_number: usize,
range_in_part: Range<usize>,
) -> object_store::Result<Option<Bytes>> {
let part_path = Self::make_part_path(
self.root_folder.clone(),
&self.location,
part_number,
self.part_size,
);
let this_part_path = part_path.clone();
#[allow(clippy::disallowed_methods)]
let result = tokio::task::spawn_blocking(move || {
let mut file = match std::fs::File::open(&this_part_path) {
Ok(f) => f,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(wrap_io_err(err)),
};
let mut buffer = vec![0; range_in_part.len()];
let pos = file
.seek(SeekFrom::Start(range_in_part.start as u64))
.map_err(wrap_io_err)?;
assert_eq!(pos, range_in_part.start as u64);
file.read_exact(&mut buffer).map_err(wrap_io_err)?;
Ok(Some(Bytes::from(buffer)))
})
.await
.map_err(wrap_io_err)??;
if result.is_some() {
if let Some(evictor) = &self.evictor {
evictor
.track_entry_accessed(part_path, self.part_size, false)
.await;
}
}
Ok(result)
}
#[cfg(test)]
async fn cached_parts(
&self,
) -> object_store::Result<Vec<crate::cached_object_store::storage::PartID>> {
let head_path = Self::make_head_path(self.root_folder.clone(), &self.location);
let directory_path = match head_path.parent() {
Some(directory_path) => directory_path.to_path_buf(),
None => return Ok(vec![]),
};
#[allow(clippy::disallowed_methods)]
tokio::task::spawn_blocking(move || {
let target_prefix = "_part";
let entries = match std::fs::read_dir(&directory_path) {
Ok(entries) => entries,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(vec![]),
Err(err) => return Err(wrap_io_err(err)),
};
let mut part_file_names = vec![];
for entry in entries {
let entry = entry.map_err(wrap_io_err)?;
let metadata = entry.metadata().map_err(wrap_io_err)?;
if metadata.is_dir() {
continue;
}
let file_name = entry.file_name();
let file_name_str = file_name.to_string_lossy();
if file_name_str.starts_with(target_prefix) {
part_file_names.push(file_name_str.to_string());
}
}
if part_file_names.is_empty() {
return Ok(vec![]);
}
part_file_names.sort();
let mut part_numbers = Vec::with_capacity(part_file_names.len());
for part_file_name in part_file_names.iter() {
let part_number = part_file_name
.split('-')
.next_back()
.and_then(|part_number| part_number.parse::<usize>().ok());
if let Some(part_number) = part_number {
part_numbers.push(part_number);
}
}
Ok(part_numbers)
})
.await
.map_err(wrap_io_err)?
}
async fn save_head(&self, head: (&ObjectMeta, &Attributes)) -> object_store::Result<()> {
match self.read_head().await {
Ok(Some(_)) => return Ok(()),
Ok(None) => {}
Err(_) => {
}
}
let head: LocalCacheHead = head.into();
let buf: Bytes = serde_json::to_vec(&head).map_err(wrap_io_err)?.into();
let meta_path = Self::make_head_path(self.root_folder.clone(), &self.location);
self.atomic_write(meta_path, buf).await
}
async fn read_head(&self) -> object_store::Result<Option<(ObjectMeta, Attributes)>> {
let head_path = Self::make_head_path(self.root_folder.clone(), &self.location);
#[allow(clippy::disallowed_methods)]
let result = tokio::task::spawn_blocking(move || {
use std::io::Read;
let metadata = match std::fs::metadata(&head_path) {
Ok(m) => m,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(wrap_io_err(err)),
};
let head_size_bytes = metadata.len() as usize;
let mut file = std::fs::File::open(&head_path).map_err(wrap_io_err)?;
let mut content = String::new();
file.read_to_string(&mut content).map_err(wrap_io_err)?;
let head: LocalCacheHead = serde_json::from_str(&content).map_err(wrap_io_err)?;
Ok(Some((head.meta(), head.attributes(), head_size_bytes)))
})
.await
.map_err(wrap_io_err)??;
if let Some((meta, attributes, head_size_bytes)) = result {
if let Some(evictor) = &self.evictor {
let head_path = Self::make_head_path(self.root_folder.clone(), &self.location);
evictor
.track_entry_accessed(head_path, head_size_bytes, false)
.await;
}
Ok(Some((meta, attributes)))
} else {
Ok(None)
}
}
}
type FsCacheEvictorWork = (std::path::PathBuf, usize, bool);
const QUEUE_FULL_LOG_INTERVAL_MS: i64 = 30_000;
#[derive(Debug)]
struct FsCacheEvictor {
root_folder: std::path::PathBuf,
max_cache_size_bytes: usize,
scan_interval: Option<Duration>,
tx: tokio::sync::mpsc::Sender<FsCacheEvictorWork>,
rx: Mutex<Option<tokio::sync::mpsc::Receiver<FsCacheEvictorWork>>>,
started: AtomicBool,
queue_full_count: AtomicU64,
last_queue_full_log_ms: AtomicI64,
background_evict_handle: OnceCell<tokio::task::JoinHandle<()>>,
background_scan_handle: OnceCell<tokio::task::JoinHandle<()>>,
stats: Arc<CachedObjectStoreStats>,
system_clock: Arc<dyn SystemClock>,
rand: Arc<DbRand>,
}
impl FsCacheEvictor {
fn new(
root_folder: std::path::PathBuf,
max_cache_size_bytes: usize,
scan_interval: Option<Duration>,
stats: Arc<CachedObjectStoreStats>,
system_clock: Arc<dyn SystemClock>,
rand: Arc<DbRand>,
) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(100);
Self {
root_folder,
scan_interval,
max_cache_size_bytes,
tx,
rx: Mutex::new(Some(rx)),
started: AtomicBool::new(false),
queue_full_count: AtomicU64::new(0),
last_queue_full_log_ms: AtomicI64::new(i64::MIN),
background_evict_handle: OnceCell::new(),
background_scan_handle: OnceCell::new(),
stats,
system_clock,
rand,
}
}
async fn start(&self) {
let inner = Arc::new(FsCacheEvictorInner::new(
self.root_folder.clone(),
self.max_cache_size_bytes,
self.stats.clone(),
self.rand.clone(),
));
let guard = self.rx.lock();
let rx = guard.await.take().expect("evictor already started");
self.started.store(true, Ordering::Release);
self.background_scan_handle
.set(tokio::spawn(Self::background_scan(
inner.clone(),
self.scan_interval,
self.system_clock.clone(),
)))
.ok();
self.background_evict_handle
.set(tokio::spawn(Self::background_evict(
inner,
rx,
self.system_clock.clone(),
)))
.ok();
}
fn started(&self) -> bool {
self.started.load(Ordering::Acquire)
}
async fn background_evict(
inner: Arc<FsCacheEvictorInner>,
mut rx: tokio::sync::mpsc::Receiver<FsCacheEvictorWork>,
system_clock: Arc<dyn SystemClock>,
) {
loop {
match rx.recv().await {
Some((path, bytes, evict)) => {
inner
.track_entry_accessed(path, bytes, system_clock.now(), evict)
.await;
}
None => return,
}
}
}
async fn background_scan(
inner: Arc<FsCacheEvictorInner>,
scan_interval: Option<Duration>,
system_clock: Arc<dyn SystemClock>,
) {
inner.clone().scan_entries(true).await;
if let Some(scan_interval) = scan_interval {
loop {
system_clock.clone().sleep(scan_interval).await;
inner.clone().scan_entries(true).await;
}
}
}
#[allow(clippy::disallowed_methods)]
async fn track_entry_accessed(
&self,
path: std::path::PathBuf,
bytes: usize,
evict: bool,
) -> bool {
if !self.started() {
return true;
}
match self.tx.try_send((path, bytes, evict)) {
Ok(()) => true,
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
self.queue_full_count.fetch_add(1, Ordering::AcqRel);
let now_ms = self.system_clock.now().timestamp_millis();
let last_log_ms = self.last_queue_full_log_ms.load(Ordering::Acquire);
if now_ms.saturating_sub(last_log_ms) >= QUEUE_FULL_LOG_INTERVAL_MS
&& self
.last_queue_full_log_ms
.compare_exchange(last_log_ms, now_ms, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let queue_full_count = self.queue_full_count.swap(0, Ordering::AcqRel);
warn!(
"evictor queue skipped cache write/access event because it was full {} times in the last 30s",
queue_full_count,
);
}
false
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => false,
}
}
}
#[derive(Debug, Clone)]
struct CacheEntry {
access_time: DateTime<Utc>,
size_bytes: usize,
key_index: usize,
}
#[derive(Debug, Default)]
struct CacheState {
entries: HashMap<std::path::PathBuf, CacheEntry>,
keys: Vec<std::path::PathBuf>,
}
#[derive(Debug)]
struct FsCacheEvictorInner {
root_folder: std::path::PathBuf,
max_cache_size_bytes: usize,
track_lock: Mutex<()>,
cache_state: Mutex<CacheState>,
cache_size_bytes: AtomicU64,
stats: Arc<CachedObjectStoreStats>,
rand: Arc<DbRand>,
}
impl FsCacheEvictorInner {
fn new(
root_folder: std::path::PathBuf,
max_cache_size_bytes: usize,
stats: Arc<CachedObjectStoreStats>,
rand: Arc<DbRand>,
) -> Self {
Self {
root_folder,
max_cache_size_bytes,
track_lock: Mutex::new(()),
cache_state: Mutex::new(CacheState::default()),
cache_size_bytes: AtomicU64::new(0_u64),
stats,
rand,
}
}
async fn scan_entries(self: Arc<Self>, evict: bool) {
let root_folder = self.root_folder.clone();
#[allow(clippy::disallowed_methods)]
let paths = tokio::task::spawn_blocking(move || {
WalkDir::new(&root_folder)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.map(|e| e.path().to_path_buf())
.collect::<Vec<_>>()
})
.await
.unwrap_or_default();
for path in paths {
let metadata = match tokio::fs::metadata(&path).await {
Ok(metadata) => metadata,
Err(err) => {
if err.kind() != std::io::ErrorKind::NotFound {
warn!(
"evictor failed to get the metadata of the cache file [path={:?}, error={}]",
path, err
);
}
continue;
}
};
#[allow(clippy::disallowed_types)]
let atime = metadata
.accessed()
.unwrap_or(std::time::SystemTime::UNIX_EPOCH)
.into();
let bytes = metadata.len() as usize;
self.track_entry_accessed(path, bytes, atime, evict).await;
}
}
async fn track_entry_accessed(
&self,
path: std::path::PathBuf,
bytes: usize,
accessed_time: DateTime<Utc>,
evict: bool,
) -> usize {
let _track_guard = self.track_lock.lock().await;
let entry_count = {
let mut cache_state = self.cache_state.lock().await;
match cache_state.entries.get_mut(&path) {
Some(entry) => {
entry.access_time = accessed_time;
}
None => {
let key_index = cache_state.keys.len();
cache_state.keys.push(path.clone());
cache_state.entries.insert(
path.clone(),
CacheEntry {
access_time: accessed_time,
size_bytes: bytes,
key_index,
},
);
self.cache_size_bytes
.fetch_add(bytes as u64, Ordering::SeqCst);
}
}
cache_state.entries.len()
};
self.stats.object_store_cache_keys.set(entry_count as i64);
self.stats
.object_store_cache_bytes
.set(self.cache_size_bytes.load(Ordering::Relaxed) as i64);
if self.cache_size_bytes.load(Ordering::Relaxed) <= self.max_cache_size_bytes as u64 {
return 0;
}
let evicted_bytes: usize = if evict
&& self.cache_size_bytes.load(Ordering::Relaxed) > self.max_cache_size_bytes as u64
{
let target_size = ((self.max_cache_size_bytes as f64) * 0.9) as u64;
self.evict_to_target_size(target_size).await
} else {
0
};
evicted_bytes
}
async fn evict_to_target_size(&self, target_size: u64) -> usize {
let picked_targets = self.pick_evict_targets(target_size).await;
if picked_targets.is_empty() {
if self.cache_size_bytes.load(Ordering::Relaxed) > target_size {
warn!(
"cache_size_bytes still exceeds max_cache_size_bytes but no more entries can be evicted(cache_size_bytes={}, max_cache_size_bytes={})",
self.cache_size_bytes.load(Ordering::Relaxed),
self.max_cache_size_bytes
);
}
return 0;
}
let mut deleted_targets: Vec<(std::path::PathBuf, usize)> =
Vec::with_capacity(picked_targets.len());
for (target, target_bytes) in picked_targets {
match tokio::fs::remove_file(&target).await {
Ok(()) => {
debug!(
"evictor evicted cache file [path={:?}, bytes={}]",
target,
format_bytes_si(target_bytes as u64)
);
deleted_targets.push((target, target_bytes));
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
deleted_targets.push((target, target_bytes));
}
Err(err) => {
warn!("evictor failed to remove the cache file [error={}]", err);
}
}
}
if deleted_targets.is_empty() {
return 0;
}
let (entry_count, total_evicted_bytes) = {
let mut cache_state = self.cache_state.lock().await;
let mut total_bytes: usize = 0;
for (target, target_bytes) in deleted_targets.iter() {
if let Some(removed) = cache_state.entries.remove(target) {
cache_state.keys.swap_remove(removed.key_index);
if removed.key_index < cache_state.keys.len() {
let swapped_key = cache_state.keys[removed.key_index].clone();
if let Some(swapped) = cache_state.entries.get_mut(&swapped_key) {
swapped.key_index = removed.key_index;
}
}
self.cache_size_bytes
.fetch_sub(*target_bytes as u64, Ordering::SeqCst);
total_bytes += target_bytes;
}
}
(cache_state.entries.len(), total_bytes)
};
self.stats
.object_store_cache_evicted_bytes
.increment(total_evicted_bytes as u64);
self.stats
.object_store_cache_evicted_keys
.increment(deleted_targets.len() as u64);
self.stats.object_store_cache_keys.set(entry_count as i64);
self.stats
.object_store_cache_bytes
.set(self.cache_size_bytes.load(Ordering::Relaxed) as i64);
total_evicted_bytes
}
async fn pick_evict_targets(&self, target_size: u64) -> Vec<(std::path::PathBuf, usize)> {
let cache_state = self.cache_state.lock().await;
if cache_state.keys.len() < 2 {
return vec![];
}
let mut targets = Vec::new();
let mut simulated_size = self.cache_size_bytes.load(Ordering::Relaxed);
let mut picked_indices: HashSet<usize> = HashSet::new();
let mut rng = self.rand.rng();
while simulated_size > target_size {
let available_count = cache_state.keys.len() - picked_indices.len();
if available_count < 2 {
break;
}
let idx0 = match self.pick_random_available_index(
&mut rng,
&cache_state.keys,
&picked_indices,
None,
) {
Some(idx) => idx,
None => break,
};
let idx1 = match self.pick_random_available_index(
&mut rng,
&cache_state.keys,
&picked_indices,
Some(idx0),
) {
Some(idx) => idx,
None => break,
};
let path0 = &cache_state.keys[idx0];
let path1 = &cache_state.keys[idx1];
let entry0 = match cache_state.entries.get(path0) {
Some(e) => e,
None => break,
};
let entry1 = match cache_state.entries.get(path1) {
Some(e) => e,
None => break,
};
let (chosen_idx, chosen_path, chosen_bytes) =
if entry0.access_time <= entry1.access_time {
(idx0, path0.clone(), entry0.size_bytes)
} else {
(idx1, path1.clone(), entry1.size_bytes)
};
picked_indices.insert(chosen_idx);
simulated_size = simulated_size.saturating_sub(chosen_bytes as u64);
targets.push((chosen_path, chosen_bytes));
}
targets
}
fn pick_random_available_index(
&self,
rng: &mut impl rand::Rng,
keys: &[std::path::PathBuf],
picked: &HashSet<usize>,
exclude_idx: Option<usize>,
) -> Option<usize> {
let excluded_not_picked = exclude_idx.is_some_and(|idx| !picked.contains(&idx));
let available_count = keys
.len()
.saturating_sub(picked.len())
.saturating_sub(usize::from(excluded_not_picked));
if available_count == 0 {
return None;
}
loop {
let idx = rng.random_range(0..keys.len());
if !picked.contains(&idx) && Some(idx) != exclude_idx {
return Some(idx);
}
}
}
}
fn wrap_io_err(err: impl std::error::Error + Send + Sync + 'static) -> object_store::Error {
object_store::Error::Generic {
store: "cached_object_store",
source: Box::new(err),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cached_object_store::stats::{CACHE_BYTES, CACHE_KEYS, EVICTED_BYTES, EVICTED_KEYS};
use crate::test_utils::gen_rand_bytes;
use filetime::FileTime;
use slatedb_common::clock::DefaultSystemClock;
use slatedb_common::metrics::{lookup_metric, DefaultMetricsRecorder, MetricsRecorderHelper};
use std::{io::Write, sync::atomic::Ordering, time::SystemTime};
fn gen_rand_file(
folder_path: &std::path::Path,
file_name: &str,
n: usize,
) -> std::path::PathBuf {
let file_path = folder_path.join(file_name);
let bytes = gen_rand_bytes(n);
let mut file = std::fs::File::create(&file_path).unwrap();
file.write_all(&bytes).unwrap();
file_path
}
#[tokio::test]
async fn test_evictor() {
let temp_dir = tempfile::Builder::new()
.prefix("objstore_cache_test_evictor_")
.tempdir()
.unwrap();
let recorder = slatedb_common::metrics::MetricsRecorderHelper::noop();
let evictor = FsCacheEvictorInner::new(
temp_dir.path().to_path_buf(),
1024 * 2,
Arc::new(CachedObjectStoreStats::new(&recorder)),
Arc::new(DbRand::default()),
);
let path0 = gen_rand_file(temp_dir.path(), "file0", 1024);
let evicted = evictor
.track_entry_accessed(path0, 1024, DefaultSystemClock::default().now(), true)
.await;
assert_eq!(evicted, 0);
let path1 = gen_rand_file(temp_dir.path(), "file1", 1024);
let evicted = evictor
.track_entry_accessed(path1, 1024, DefaultSystemClock::default().now(), true)
.await;
assert_eq!(evicted, 0);
let path2 = gen_rand_file(temp_dir.path(), "file2", 1024);
let evicted = evictor
.track_entry_accessed(path2, 1024, DefaultSystemClock::default().now(), true)
.await;
assert_eq!(evicted, 2048);
let file_paths = walkdir::WalkDir::new(temp_dir.path())
.into_iter()
.map(|entry| entry.unwrap().file_name().to_string_lossy().to_string())
.collect::<Vec<_>>();
assert_eq!(file_paths.len(), 2); }
#[tokio::test]
async fn test_evictor_track_entry_accessed_backpressure() {
let temp_dir = tempfile::Builder::new()
.prefix("objstore_cache_test_evictor_backpressure_")
.tempdir()
.unwrap();
let recorder = slatedb_common::metrics::MetricsRecorderHelper::noop();
let evictor = FsCacheEvictor::new(
temp_dir.path().to_path_buf(),
1024,
None,
Arc::new(CachedObjectStoreStats::new(&recorder)),
Arc::new(DefaultSystemClock::new()),
Arc::new(DbRand::default()),
);
evictor.started.store(true, Ordering::Release);
for idx in 0..100 {
let accepted = evictor
.track_entry_accessed(std::path::PathBuf::from(format!("file{idx}")), 1, true)
.await;
assert!(accepted);
}
let accepted = evictor
.track_entry_accessed(std::path::PathBuf::from("overflow"), 1, true)
.await;
assert!(!accepted);
}
#[tokio::test]
async fn test_evictor_pick() {
let temp_dir = tempfile::Builder::new()
.prefix("objstore_cache_test_evictor_")
.tempdir()
.unwrap();
let recorder = slatedb_common::metrics::MetricsRecorderHelper::noop();
let evictor = Arc::new(FsCacheEvictorInner::new(
temp_dir.path().to_path_buf(),
1024 * 2,
Arc::new(CachedObjectStoreStats::new(&recorder)),
Arc::new(DbRand::default()),
));
let path0 = gen_rand_file(temp_dir.path(), "file0", 1024);
gen_rand_file(temp_dir.path(), "file1", 1025);
filetime::set_file_atime(&path0, FileTime::from_system_time(SystemTime::UNIX_EPOCH))
.unwrap();
evictor.clone().scan_entries(false).await;
let targets = evictor.pick_evict_targets(1025).await;
assert_eq!(targets.len(), 1);
let (target_path, size) = &targets[0];
assert_eq!(*target_path, path0);
assert_eq!(*size, 1024);
}
#[tokio::test]
async fn test_evictor_rescan() {
let temp_dir = tempfile::Builder::new()
.prefix("objstore_cache_test_evictor_")
.tempdir()
.unwrap();
let recorder = slatedb_common::metrics::MetricsRecorderHelper::noop();
let evictor = Arc::new(FsCacheEvictorInner::new(
temp_dir.path().to_path_buf(),
1024 * 2,
Arc::new(CachedObjectStoreStats::new(&recorder)),
Arc::new(DbRand::default()),
));
gen_rand_file(temp_dir.path(), "file0", 1024);
gen_rand_file(temp_dir.path(), "file1", 1025);
evictor.clone().scan_entries(false).await;
let cache_size_bytes = evictor.cache_size_bytes.load(Ordering::SeqCst);
assert_eq!(cache_size_bytes, 2049);
evictor.clone().scan_entries(false).await;
let cache_size_bytes = evictor.cache_size_bytes.load(Ordering::SeqCst);
assert_eq!(cache_size_bytes, 2049);
}
#[rstest::rstest]
#[case(&[0, 1], &[], None, &[0, 1])]
#[case(&[0, 1], &[], Some(0), &[1])]
#[case(&[0, 1], &[], Some(1), &[0])]
#[case(&[0, 1, 2], &[0], None, &[1, 2])]
#[case(&[0, 1, 2], &[0], Some(1), &[2])]
#[case(&[0, 1, 2, 3], &[0, 1], None, &[2, 3])]
#[case(&[0, 1, 2, 3], &[0, 1], Some(2), &[3])]
#[case(&[0, 1], &[0], Some(0), &[1])]
#[case(&[0, 1], &[0], Some(1), &[])]
fn test_pick_random_available_index(
#[case] key_indices: &[usize],
#[case] picked_indices: &[usize],
#[case] exclude_idx: Option<usize>,
#[case] expected_possible: &[usize],
) {
let temp_dir = tempfile::Builder::new()
.prefix("objstore_cache_test_pick_")
.tempdir()
.unwrap();
let recorder = slatedb_common::metrics::MetricsRecorderHelper::noop();
let evictor = FsCacheEvictorInner::new(
temp_dir.path().to_path_buf(),
1024,
Arc::new(CachedObjectStoreStats::new(&recorder)),
Arc::new(DbRand::default()),
);
let keys: Vec<std::path::PathBuf> = key_indices
.iter()
.map(|i| std::path::PathBuf::from(format!("file{}", i)))
.collect();
let picked: HashSet<usize> = picked_indices.iter().copied().collect();
let mut rng = evictor.rand.rng();
if expected_possible.is_empty() {
let result = evictor.pick_random_available_index(&mut rng, &keys, &picked, exclude_idx);
assert!(
result.is_none(),
"pick_random_available_index should return None, got {:?}",
result
);
} else {
for _ in 0..100 {
let result =
evictor.pick_random_available_index(&mut rng, &keys, &picked, exclude_idx);
assert!(
result.is_some_and(|r| expected_possible.contains(&r)),
"pick_random_available_index returned {:?}, expected one of {:?}",
result,
expected_possible
);
}
}
}
#[tokio::test]
async fn test_should_record_cache_and_eviction_metrics() {
let temp_dir = tempfile::Builder::new()
.prefix("objstore_cache_test_metrics_")
.tempdir()
.unwrap();
let recorder = Arc::new(DefaultMetricsRecorder::new());
let helper = MetricsRecorderHelper::new(recorder.clone(), Default::default());
let evictor = FsCacheEvictorInner::new(
temp_dir.path().to_path_buf(),
1024 * 2,
Arc::new(CachedObjectStoreStats::new(&helper)),
Arc::new(DbRand::default()),
);
let path0 = gen_rand_file(temp_dir.path(), "file0", 1024);
evictor
.track_entry_accessed(path0, 1024, DefaultSystemClock::default().now(), true)
.await;
let path1 = gen_rand_file(temp_dir.path(), "file1", 1024);
evictor
.track_entry_accessed(path1, 1024, DefaultSystemClock::default().now(), true)
.await;
assert_eq!(lookup_metric(&recorder, CACHE_KEYS), Some(2));
assert_eq!(lookup_metric(&recorder, CACHE_BYTES), Some(2048));
let path2 = gen_rand_file(temp_dir.path(), "file2", 1024);
evictor
.track_entry_accessed(path2, 1024, DefaultSystemClock::default().now(), true)
.await;
let evicted_keys = lookup_metric(&recorder, EVICTED_KEYS).unwrap();
let evicted_bytes = lookup_metric(&recorder, EVICTED_BYTES).unwrap();
assert!(
evicted_keys >= 1,
"expected evicted_keys >= 1, got {evicted_keys}"
);
assert!(
evicted_bytes >= 1024,
"expected evicted_bytes >= 1024, got {evicted_bytes}"
);
let keys = lookup_metric(&recorder, CACHE_KEYS).unwrap();
assert!(keys >= 1, "expected cache_keys >= 1, got {keys}");
}
}