use std::{
collections::VecDeque,
fmt::Display,
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
time::Duration,
};
use foyer::{
BlockEngineConfig, DeviceBuilder, Error, Event, EventListener, FsDeviceBuilder, HybridCache, HybridCacheBuilder,
HybridCachePolicy, HybridCacheProperties, Location, PsyncIoEngineConfig,
};
use rand::{rng, Rng};
const KB: usize = 1024;
const MB: usize = 1024 * KB;
const WRITERS: usize = 8;
const FETCHERS: usize = 16;
const READERS: usize = 8;
const WRITES: usize = 2000;
const FETCHES: usize = 2000;
const READS: usize = 2000;
const ERROR_PER_FETCHES: usize = 10;
const DUPLICATES: usize = 10;
const MISS_WAIT: Duration = Duration::from_millis(10);
const WRITE_WAIT: Duration = Duration::from_millis(1);
const FETCH_WAIT: Duration = Duration::from_millis(1);
const READ_WAIT: Duration = Duration::from_millis(1);
const INTERVAL: usize = 100;
#[derive(Debug)]
struct Trap;
impl Display for Trap {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Trap")
}
}
impl std::error::Error for Trap {}
#[derive(Debug)]
struct RecentEvictionQueue {
queue: RwLock<VecDeque<u64>>,
capacity: usize,
}
impl EventListener for RecentEvictionQueue {
type Key = u64;
type Value = Vec<u8>;
fn on_leave(&self, _: Event, key: &Self::Key, _: &Self::Value) {
let mut queue = self.queue.write().unwrap();
if queue.len() < self.capacity {
queue.push_back(*key)
} else {
queue.pop_front();
queue.push_back(*key);
}
}
}
impl RecentEvictionQueue {
fn new(capacity: usize) -> Self {
Self {
queue: RwLock::new(VecDeque::with_capacity(capacity)),
capacity,
}
}
fn pick(&self) -> Option<u64> {
let queue = self.queue.read().unwrap();
if queue.is_empty() {
return None;
}
let index = rng().random_range(0..queue.len());
Some(queue[index])
}
}
#[test_log::test(tokio::test)]
async fn test_concurrent_insert_disk_cache_and_fetch() {
let dir = tempfile::tempdir().unwrap();
let recent = Arc::new(RecentEvictionQueue::new(10));
let hybrid: HybridCache<u64, Vec<u8>> = HybridCacheBuilder::new()
.with_name("test")
.with_policy(HybridCachePolicy::WriteOnEviction)
.with_event_listener(recent.clone())
.memory(MB)
.with_weighter(|_, v| 8 + v.len())
.storage()
.with_io_engine_config(PsyncIoEngineConfig::new())
.with_engine_config(
BlockEngineConfig::new(FsDeviceBuilder::new(dir).with_capacity(64 * MB).build().unwrap())
.with_block_size(4 * MB),
)
.build()
.await
.unwrap();
let idx = Arc::new(AtomicU64::new(0));
let mut handles = vec![];
for _ in 0..WRITERS {
let h = hybrid.clone();
let r = recent.clone();
let i = idx.clone();
handles.push(tokio::spawn(async move { write(h, r, i).await }));
}
for _ in 0..FETCHERS {
let h = hybrid.clone();
let r = recent.clone();
handles.push(tokio::spawn(async move { fetch(h, r).await }));
}
for _ in 0..READERS {
let h = hybrid.clone();
let r = recent.clone();
handles.push(tokio::spawn(async move { read(h, r).await }));
}
for h in handles {
h.await.unwrap();
}
}
fn value(key: u64) -> Vec<u8> {
vec![key as u8; 4 * KB]
}
async fn write(hybrid: HybridCache<u64, Vec<u8>>, _: Arc<RecentEvictionQueue>, idx: Arc<AtomicU64>) {
loop {
let key = idx.fetch_add(1, Ordering::Relaxed);
if key > WRITES as u64 {
break;
}
if key % INTERVAL as u64 == 0 {
tracing::info!("Inserted {key} items");
}
for k in key.saturating_sub(DUPLICATES as u64)..=key {
hybrid.insert_with_properties(
k,
value(k),
HybridCacheProperties::default().with_location(Location::OnDisk),
);
tokio::time::sleep(WRITE_WAIT).await;
}
}
}
async fn fetch(hybrid: HybridCache<u64, Vec<u8>>, recent: Arc<RecentEvictionQueue>) {
let mut cnt = 0;
loop {
tokio::time::sleep(FETCH_WAIT).await;
let key = match recent.pick() {
Some(v) => v,
None => continue,
};
let res = if cnt % ERROR_PER_FETCHES as u64 == 0 {
hybrid
.get_or_fetch(&key, || async move { Err::<Vec<u8>, _>(Trap) })
.await
} else {
hybrid
.get_or_fetch(&key, || async move {
tokio::time::sleep(MISS_WAIT).await;
Ok::<_, Error>(value(key))
})
.await
};
match res {
Ok(e) => assert_eq!(e.value(), &value(key)),
Err(e) => assert!(e.downcast_ref::<Trap>().is_some(), "Expected Trap error, got: {e}"),
}
cnt += 1;
if cnt % INTERVAL as u64 == 0 {
tracing::info!("Fetch {cnt} items");
}
if cnt >= FETCHES as u64 {
break;
}
}
}
async fn read(hybrid: HybridCache<u64, Vec<u8>>, recent: Arc<RecentEvictionQueue>) {
let mut cnt = 0;
loop {
tokio::time::sleep(READ_WAIT).await;
let key = match recent.pick() {
Some(v) => v,
None => continue,
};
let res = hybrid.get(&key).await;
match res {
Ok(e) => {
if let Some(e) = e {
assert_eq!(e.value(), &value(key))
}
}
Err(e) => assert!(e.downcast_ref::<Trap>().is_some(), "Expected Trap error, got: {e}"),
}
cnt += 1;
if cnt % INTERVAL as u64 == 0 {
tracing::info!("Read {cnt} items");
}
if cnt >= READS as u64 {
break;
}
}
}