use super::get_page_from_blob;
use crate::{Blob, BufferPool, BufferPooler, Error, IoBuf, IoBufMut};
use commonware_utils::sync::RwLock;
use futures::{future::Shared, FutureExt};
use std::{
collections::{hash_map::Entry, HashMap},
future::Future,
num::{NonZeroU16, NonZeroUsize},
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
};
use tracing::{debug, error, trace};
type PageFetchFuture = Shared<Pin<Box<dyn Future<Output = Result<IoBuf, Arc<Error>>> + Send>>>;
type PageFetch = Arc<PageFetchFuture>;
struct PageFetchEntry {
fetch: PageFetch,
waiters: usize,
}
struct PageFetchGuard {
cache: Arc<RwLock<Cache>>,
key: (u64, u64),
fetch: PageFetch,
armed: bool,
}
impl PageFetchGuard {
const fn new(cache: Arc<RwLock<Cache>>, key: (u64, u64), fetch: PageFetch) -> Self {
Self {
cache,
key,
fetch,
armed: true,
}
}
const fn disarm(&mut self) {
self.armed = false;
}
}
impl Drop for PageFetchGuard {
fn drop(&mut self) {
if !self.armed {
return;
}
let mut cache = self.cache.write();
let Entry::Occupied(mut current) = cache.page_fetches.entry(self.key) else {
return;
};
if !Arc::ptr_eq(¤t.get().fetch, &self.fetch) {
return;
}
if current.get().waiters == 1 {
current.remove();
} else {
current.get_mut().waiters -= 1;
}
}
}
struct Cache {
index: HashMap<(u64, u64), usize>,
entries: Vec<CacheEntry>,
slots: Vec<IoBufMut>,
page_size: usize,
clock: usize,
capacity: usize,
page_fetches: HashMap<(u64, u64), PageFetchEntry>,
}
struct CacheEntry {
key: (u64, u64),
referenced: AtomicBool,
}
#[derive(Clone)]
pub struct CacheRef {
page_size: u64,
next_id: Arc<AtomicU64>,
cache: Arc<RwLock<Cache>>,
pool: BufferPool,
}
impl CacheRef {
pub fn new(pool: BufferPool, page_size: NonZeroU16, capacity: NonZeroUsize) -> Self {
let page_size_u64 = page_size.get() as u64;
Self {
page_size: page_size_u64,
next_id: Arc::new(AtomicU64::new(0)),
cache: Arc::new(RwLock::new(Cache::new(pool.clone(), page_size, capacity))),
pool,
}
}
pub fn from_pooler(
pooler: &impl BufferPooler,
page_size: NonZeroU16,
capacity: NonZeroUsize,
) -> Self {
Self::new(pooler.storage_buffer_pool().clone(), page_size, capacity)
}
#[inline]
pub const fn page_size(&self) -> u64 {
self.page_size
}
#[inline]
pub const fn pool(&self) -> &BufferPool {
&self.pool
}
pub fn next_id(&self) -> u64 {
self.next_id.fetch_add(1, Ordering::Relaxed)
}
pub const fn offset_to_page(&self, offset: u64) -> (u64, u64) {
Cache::offset_to_page(self.page_size, offset)
}
pub(super) fn read_cached(
&self,
blob_id: u64,
mut buf: &mut [u8],
mut logical_offset: u64,
) -> usize {
let original_len = buf.len();
let page_cache = self.cache.read();
while !buf.is_empty() {
let count = page_cache.read_at(blob_id, buf, logical_offset);
if count == 0 {
break;
}
logical_offset += count as u64;
buf = &mut buf[count..];
}
original_len - buf.len()
}
pub(super) async fn read<B: Blob>(
&self,
blob: &B,
blob_id: u64,
mut buf: &mut [u8],
mut offset: u64,
) -> Result<(), Error> {
while !buf.is_empty() {
{
let page_cache = self.cache.read();
let count = page_cache.read_at(blob_id, buf, offset);
if count != 0 {
offset += count as u64;
buf = &mut buf[count..];
continue;
}
}
let count = self
.read_after_page_fault(blob, blob_id, buf, offset)
.await?;
offset += count as u64;
buf = &mut buf[count..];
}
Ok(())
}
pub(super) async fn read_after_page_fault<B: Blob>(
&self,
blob: &B,
blob_id: u64,
buf: &mut [u8],
offset: u64,
) -> Result<usize, Error> {
assert!(!buf.is_empty());
let (page_num, offset_in_page) = Cache::offset_to_page(self.page_size, offset);
let offset_in_page = offset_in_page as usize;
trace!(page_num, blob_id, "page fault");
let (fetch_future, mut fetch_guard) = {
let mut cache = self.cache.write();
let count = cache.read_at(blob_id, buf, offset);
if count != 0 {
return Ok(count);
}
let key = (blob_id, page_num);
match cache.page_fetches.entry(key) {
Entry::Occupied(o) => {
let entry = o.into_mut();
entry.waiters += 1;
let fetch_future = entry.fetch.as_ref().clone();
let fetch = Arc::clone(&entry.fetch);
(
fetch_future,
PageFetchGuard::new(Arc::clone(&self.cache), key, fetch),
)
}
Entry::Vacant(v) => {
let blob = blob.clone();
let cache = Arc::clone(&self.cache);
let page_size = self.page_size;
let future = async move {
let result = fetch_cacheable_page(&blob, page_num, page_size).await;
if let Err(err) = &result {
error!(page_num, ?err, "Page fetch failed");
}
let mut cache = cache.write();
if let Ok(page) = &result {
cache.cache(blob_id, page.as_ref(), page_num);
}
let _ = cache.page_fetches.remove(&key);
result
};
let fetch_future = future.boxed().shared();
let fetch = Arc::new(fetch_future.clone());
v.insert(PageFetchEntry {
fetch: Arc::clone(&fetch),
waiters: 1,
});
(
fetch_future,
PageFetchGuard::new(Arc::clone(&self.cache), key, fetch),
)
}
}
};
let fetch_result = fetch_future.await;
fetch_guard.disarm();
let page_buf = match fetch_result {
Ok(page_buf) => page_buf,
Err(_) => return Err(Error::ReadFailed),
};
let bytes_to_copy = std::cmp::min(buf.len(), page_buf.len() - offset_in_page);
buf[..bytes_to_copy]
.copy_from_slice(&page_buf.as_ref()[offset_in_page..offset_in_page + bytes_to_copy]);
Ok(bytes_to_copy)
}
pub fn cache(&self, blob_id: u64, mut buf: &[u8], offset: u64) -> usize {
let (mut page_num, offset_in_page) = self.offset_to_page(offset);
assert_eq!(offset_in_page, 0);
{
let page_size = self.page_size as usize;
let mut page_cache = self.cache.write();
while buf.len() >= page_size {
page_cache.cache(blob_id, &buf[..page_size], page_num);
buf = &buf[page_size..];
page_num = match page_num.checked_add(1) {
Some(next) => next,
None => break,
};
}
}
buf.len()
}
}
impl Cache {
pub fn new(pool: BufferPool, page_size: NonZeroU16, capacity: NonZeroUsize) -> Self {
let page_size = page_size.get() as usize;
let capacity = capacity.get();
let mut slots = Vec::with_capacity(capacity);
for _ in 0..capacity {
let slot = pool.alloc_zeroed(page_size);
slots.push(slot);
}
Self {
index: HashMap::new(),
entries: Vec::with_capacity(capacity),
slots,
page_size,
clock: 0,
capacity,
page_fetches: HashMap::new(),
}
}
#[inline]
fn page_slice(&self, slot: usize) -> &[u8] {
assert!(slot < self.capacity);
self.slots[slot].as_ref()
}
#[inline]
fn page_slice_mut(&mut self, slot: usize) -> &mut [u8] {
assert!(slot < self.capacity);
self.slots[slot].as_mut()
}
const fn offset_to_page(page_size: u64, offset: u64) -> (u64, u64) {
(offset / page_size, offset % page_size)
}
fn read_at(&self, blob_id: u64, buf: &mut [u8], logical_offset: u64) -> usize {
let (page_num, offset_in_page) =
Self::offset_to_page(self.page_size as u64, logical_offset);
let Some(&slot) = self.index.get(&(blob_id, page_num)) else {
return 0;
};
let entry = &self.entries[slot];
assert_eq!(entry.key, (blob_id, page_num));
entry.referenced.store(true, Ordering::Relaxed);
let page = self.page_slice(slot);
let bytes_to_copy = std::cmp::min(buf.len(), self.page_size - offset_in_page as usize);
buf[..bytes_to_copy].copy_from_slice(
&page[offset_in_page as usize..offset_in_page as usize + bytes_to_copy],
);
bytes_to_copy
}
fn cache(&mut self, blob_id: u64, page: &[u8], page_num: u64) {
assert_eq!(page.len(), self.page_size);
let key = (blob_id, page_num);
if let Some(&slot) = self.index.get(&key) {
debug!(blob_id, page_num, "updating duplicate page");
let entry = &self.entries[slot];
assert_eq!(entry.key, key);
entry.referenced.store(true, Ordering::Relaxed);
self.page_slice_mut(slot).copy_from_slice(page);
return;
}
if self.entries.len() < self.capacity {
let slot = self.entries.len();
self.index.insert(key, slot);
self.entries.push(CacheEntry {
key,
referenced: AtomicBool::new(true),
});
self.page_slice_mut(slot).copy_from_slice(page);
return;
}
while self.entries[self.clock].referenced.load(Ordering::Relaxed) {
self.entries[self.clock]
.referenced
.store(false, Ordering::Relaxed);
self.clock = (self.clock + 1) % self.entries.len();
}
let slot = self.clock;
let entry = &mut self.entries[slot];
assert!(self.index.remove(&entry.key).is_some());
self.index.insert(key, slot);
entry.key = key;
entry.referenced.store(true, Ordering::Relaxed);
self.page_slice_mut(slot).copy_from_slice(page);
self.clock = (self.clock + 1) % self.entries.len();
}
}
async fn fetch_cacheable_page(
blob: &impl Blob,
page_num: u64,
page_size: u64,
) -> Result<IoBuf, Arc<Error>> {
let page = get_page_from_blob(blob, page_num, page_size)
.await
.map_err(Arc::new)?;
let len = page.len();
if len != page_size as usize {
error!(
page_num,
expected = page_size,
actual = len,
"attempted to fetch partial page from blob"
);
return Err(Arc::new(Error::InvalidChecksum));
}
Ok(page)
}
#[cfg(test)]
mod tests {
use super::{super::Checksum, *};
use crate::{
buffer::paged::CHECKSUM_SIZE, deterministic, BufferPool, BufferPoolConfig, Clock as _,
IoBufsMut, Runner as _, Spawner as _, Storage as _,
};
use commonware_cryptography::Crc32;
use commonware_macros::test_traced;
use commonware_utils::{channel::oneshot, sync::Mutex, NZUsize, NZU16};
use futures::future::pending;
use prometheus_client::registry::Registry;
use std::{
num::NonZeroU16,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
const PAGE_SIZE_U64: u64 = PAGE_SIZE.get() as u64;
#[derive(Clone)]
struct BlockingBlob {
started: Arc<Mutex<Option<oneshot::Sender<()>>>>,
}
impl Blob for BlockingBlob {
async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
self.read_at_buf(offset, len, IoBufsMut::default()).await
}
async fn read_at_buf(
&self,
_offset: u64,
_len: usize,
_bufs: impl Into<IoBufsMut> + Send,
) -> Result<IoBufsMut, Error> {
let sender = self
.started
.lock()
.take()
.expect("blocking blob read started more than once");
let _ = sender.send(());
pending::<()>().await;
unreachable!()
}
async fn write_at(
&self,
_offset: u64,
_bufs: impl Into<crate::IoBufs> + Send,
) -> Result<(), Error> {
Ok(())
}
async fn resize(&self, _len: u64) -> Result<(), Error> {
Ok(())
}
async fn sync(&self) -> Result<(), Error> {
Ok(())
}
}
#[derive(Clone)]
enum ControlledBlobResult {
Success(Arc<Vec<u8>>),
Error,
}
#[derive(Clone)]
struct ControlledBlob {
started: Arc<Mutex<Option<oneshot::Sender<()>>>>,
release: Arc<Mutex<Option<oneshot::Receiver<()>>>>,
reads: Arc<AtomicUsize>,
result: ControlledBlobResult,
}
impl Blob for ControlledBlob {
async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
self.read_at_buf(offset, len, IoBufsMut::default()).await
}
async fn read_at_buf(
&self,
_offset: u64,
_len: usize,
_bufs: impl Into<IoBufsMut> + Send,
) -> Result<IoBufsMut, Error> {
if self.reads.fetch_add(1, Ordering::Relaxed) == 0 {
let sender = self
.started
.lock()
.take()
.expect("controlled blob start signal consumed more than once");
let _ = sender.send(());
let release = self
.release
.lock()
.take()
.expect("controlled blob release receiver consumed more than once");
release.await.expect("release signal dropped");
}
match &self.result {
ControlledBlobResult::Success(page) => Ok(IoBufsMut::from(page.as_ref().clone())),
ControlledBlobResult::Error => Err(Error::ReadFailed),
}
}
async fn write_at(
&self,
_offset: u64,
_bufs: impl Into<crate::IoBufs> + Send,
) -> Result<(), Error> {
Ok(())
}
async fn resize(&self, _len: u64) -> Result<(), Error> {
Ok(())
}
async fn sync(&self) -> Result<(), Error> {
Ok(())
}
}
#[test_traced]
fn test_cache_basic() {
let mut registry = Registry::default();
let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
let mut cache: Cache = Cache::new(pool, PAGE_SIZE, NZUsize!(10));
let mut buf = vec![0; PAGE_SIZE.get() as usize];
let bytes_read = cache.read_at(0, &mut buf, 0);
assert_eq!(bytes_read, 0);
cache.cache(0, &[1; PAGE_SIZE.get() as usize], 0);
let bytes_read = cache.read_at(0, &mut buf, 0);
assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
assert_eq!(buf, [1; PAGE_SIZE.get() as usize]);
cache.cache(0, &[2; PAGE_SIZE.get() as usize], 0);
let bytes_read = cache.read_at(0, &mut buf, 0);
assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
assert_eq!(buf, [2; PAGE_SIZE.get() as usize]);
for i in 0u64..11 {
cache.cache(0, &[i as u8; PAGE_SIZE.get() as usize], i);
}
let bytes_read = cache.read_at(0, &mut buf, 0);
assert_eq!(bytes_read, 0);
for i in 1u64..11 {
let bytes_read = cache.read_at(0, &mut buf, i * PAGE_SIZE_U64);
assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
}
let mut buf = vec![0; PAGE_SIZE.get() as usize];
let bytes_read = cache.read_at(0, &mut buf, PAGE_SIZE_U64 + 2);
assert_eq!(bytes_read, PAGE_SIZE.get() as usize - 2);
assert_eq!(
&buf[..PAGE_SIZE.get() as usize - 2],
[1; PAGE_SIZE.get() as usize - 2]
);
}
#[test_traced]
fn test_cache_read_with_blob() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let physical_page_size = PAGE_SIZE_U64 + CHECKSUM_SIZE;
let (blob, size) = context
.open("test", "blob".as_bytes())
.await
.expect("Failed to open blob");
assert_eq!(size, 0);
for i in 0..11 {
let logical_data = vec![i as u8; PAGE_SIZE.get() as usize];
let crc = Crc32::checksum(&logical_data);
let record = Checksum::new(PAGE_SIZE.get(), crc);
let mut page_data = logical_data;
page_data.extend_from_slice(&record.to_bytes());
blob.write_at(i * physical_page_size, page_data)
.await
.unwrap();
}
let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
assert_eq!(cache_ref.next_id(), 0);
assert_eq!(cache_ref.next_id(), 1);
for i in 0..11 {
let mut buf = vec![0; PAGE_SIZE.get() as usize];
cache_ref
.read(&blob, 0, &mut buf, i * PAGE_SIZE_U64)
.await
.unwrap();
assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
}
for i in 1..11 {
let mut buf = vec![0; PAGE_SIZE.get() as usize];
cache_ref
.read(&blob, 0, &mut buf, i * PAGE_SIZE_U64)
.await
.unwrap();
assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
}
blob.sync().await.unwrap();
});
}
#[test_traced]
fn test_cache_max_page() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(2));
let aligned_max_offset = u64::MAX - (u64::MAX % PAGE_SIZE_U64);
let logical_data = vec![42u8; PAGE_SIZE.get() as usize];
let remaining = cache_ref.cache(0, logical_data.as_slice(), aligned_max_offset);
assert_eq!(remaining, 0);
let mut buf = vec![0u8; PAGE_SIZE.get() as usize];
let page_cache = cache_ref.cache.read();
let bytes_read = page_cache.read_at(0, &mut buf, aligned_max_offset);
assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
assert!(buf.iter().all(|b| *b == 42));
});
}
#[test_traced]
fn test_cache_at_high_offset() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
const MIN_PAGE_SIZE: u64 = CHECKSUM_SIZE + 1;
let cache_ref =
CacheRef::from_pooler(&context, NZU16!(MIN_PAGE_SIZE as u16), NZUsize!(2));
let data = vec![1u8; MIN_PAGE_SIZE as usize * 2];
let aligned_max_offset = u64::MAX - (u64::MAX % MIN_PAGE_SIZE);
let high_offset = aligned_max_offset - (MIN_PAGE_SIZE * 2);
let remaining = cache_ref.cache(0, &data, high_offset);
assert_eq!(remaining, 0);
let mut buf = vec![0u8; MIN_PAGE_SIZE as usize];
let page_cache = cache_ref.cache.read();
assert_eq!(
page_cache.read_at(0, &mut buf, high_offset),
MIN_PAGE_SIZE as usize
);
assert!(buf.iter().all(|b| *b == 1));
assert_eq!(
page_cache.read_at(0, &mut buf, high_offset + MIN_PAGE_SIZE),
MIN_PAGE_SIZE as usize
);
assert!(buf.iter().all(|b| *b == 1));
});
}
#[test_traced]
fn test_page_fetches_entry_removed_when_first_fetcher_cancelled() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let blob_id = 0;
let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
let (started_tx, started_rx) = oneshot::channel();
let blob = BlockingBlob {
started: Arc::new(Mutex::new(Some(started_tx))),
};
let mut read_buf = vec![0u8; PAGE_SIZE.get() as usize];
let cache_ref_for_task = cache_ref.clone();
let blob_for_task = blob.clone();
let handle = context.spawn(move |_| async move {
let _ = cache_ref_for_task
.read(&blob_for_task, blob_id, &mut read_buf, 0)
.await;
});
started_rx.await.expect("blocking read never started");
{
let page_cache = cache_ref.cache.read();
assert!(page_cache.page_fetches.contains_key(&(blob_id, 0)));
}
handle.abort();
assert!(matches!(handle.await, Err(Error::Closed)));
let page_cache = cache_ref.cache.read();
assert!(
!page_cache.page_fetches.contains_key(&(blob_id, 0)),
"cancelled first fetcher should not leave stale page_fetches entry"
);
});
}
#[test_traced]
fn test_followers_keep_single_flight_after_first_fetcher_cancellation() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let blob_id = 0;
let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
let logical_page = vec![7u8; PAGE_SIZE.get() as usize];
let crc = Crc32::checksum(&logical_page);
let mut physical_page = logical_page.clone();
physical_page.extend_from_slice(&Checksum::new(PAGE_SIZE.get(), crc).to_bytes());
let (started_tx, started_rx) = oneshot::channel();
let (release_tx, release_rx) = oneshot::channel();
let reads = Arc::new(AtomicUsize::new(0));
let blob = ControlledBlob {
started: Arc::new(Mutex::new(Some(started_tx))),
release: Arc::new(Mutex::new(Some(release_rx))),
reads: reads.clone(),
result: ControlledBlobResult::Success(Arc::new(physical_page)),
};
let mut first_buf = vec![0u8; PAGE_SIZE.get() as usize];
let cache_ref_for_first = cache_ref.clone();
let blob_for_first = blob.clone();
let first = context.clone().spawn(move |_| async move {
let _ = cache_ref_for_first
.read(&blob_for_first, blob_id, &mut first_buf, 0)
.await;
});
started_rx.await.expect("first read never started");
let mut second_buf = vec![0u8; PAGE_SIZE.get() as usize];
let cache_ref_for_second = cache_ref.clone();
let blob_for_second = blob.clone();
let second = context.clone().spawn(move |_| async move {
cache_ref_for_second
.read(&blob_for_second, blob_id, &mut second_buf, 0)
.await
.expect("second read failed");
second_buf
});
loop {
let joined = {
let page_cache = cache_ref.cache.read();
page_cache
.page_fetches
.get(&(blob_id, 0))
.map(|fetch| fetch.waiters == 2)
.unwrap_or(false)
};
if joined {
break;
}
context.sleep(Duration::from_millis(1)).await;
}
first.abort();
assert!(matches!(first.await, Err(Error::Closed)));
let mut third_buf = vec![0u8; PAGE_SIZE.get() as usize];
let cache_ref_for_third = cache_ref.clone();
let blob_for_third = blob.clone();
let third = context.clone().spawn(move |_| async move {
cache_ref_for_third
.read(&blob_for_third, blob_id, &mut third_buf, 0)
.await
.expect("third read failed");
third_buf
});
loop {
let third_entered = {
let page_cache = cache_ref.cache.read();
reads.load(Ordering::Relaxed) > 1
|| page_cache
.page_fetches
.get(&(blob_id, 0))
.map(|fetch| fetch.waiters == 2)
.unwrap_or(false)
};
if third_entered {
break;
}
context.sleep(Duration::from_millis(1)).await;
}
let _ = release_tx.send(());
let second_buf = second.await.expect("second task failed");
let third_buf = third.await.expect("third task failed");
assert_eq!(second_buf, logical_page);
assert_eq!(third_buf, logical_page);
assert_eq!(reads.load(Ordering::Relaxed), 1);
let mut cached = vec![0u8; PAGE_SIZE.get() as usize];
assert_eq!(
cache_ref.read_cached(blob_id, &mut cached, 0),
PAGE_SIZE.get() as usize
);
assert_eq!(cached, logical_page);
let mut fourth_buf = vec![0u8; PAGE_SIZE.get() as usize];
cache_ref
.read(&blob, blob_id, &mut fourth_buf, 0)
.await
.unwrap();
assert_eq!(fourth_buf, logical_page);
assert_eq!(reads.load(Ordering::Relaxed), 1);
let page_cache = cache_ref.cache.read();
assert!(
!page_cache.page_fetches.contains_key(&(blob_id, 0)),
"completed fetch should leave no stale page_fetches entry"
);
});
}
#[test_traced]
fn test_page_fetch_error_removes_entry_for_all_waiters() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let blob_id = 0;
let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
let (started_tx, started_rx) = oneshot::channel();
let (release_tx, release_rx) = oneshot::channel();
let reads = Arc::new(AtomicUsize::new(0));
let blob = ControlledBlob {
started: Arc::new(Mutex::new(Some(started_tx))),
release: Arc::new(Mutex::new(Some(release_rx))),
reads: reads.clone(),
result: ControlledBlobResult::Error,
};
let mut first_buf = vec![0u8; PAGE_SIZE.get() as usize];
let cache_ref_for_first = cache_ref.clone();
let blob_for_first = blob.clone();
let first = context.clone().spawn(move |_| async move {
cache_ref_for_first
.read(&blob_for_first, blob_id, &mut first_buf, 0)
.await
});
started_rx.await.expect("first erroring read never started");
let mut second_buf = vec![0u8; PAGE_SIZE.get() as usize];
let cache_ref_for_second = cache_ref.clone();
let blob_for_second = blob.clone();
let second = context.clone().spawn(move |_| async move {
cache_ref_for_second
.read(&blob_for_second, blob_id, &mut second_buf, 0)
.await
});
loop {
let joined = {
let page_cache = cache_ref.cache.read();
page_cache
.page_fetches
.get(&(blob_id, 0))
.map(|fetch| fetch.waiters == 2)
.unwrap_or(false)
};
if joined {
break;
}
context.sleep(Duration::from_millis(1)).await;
}
let _ = release_tx.send(());
assert!(matches!(first.await, Ok(Err(Error::ReadFailed))));
assert!(matches!(second.await, Ok(Err(Error::ReadFailed))));
assert_eq!(reads.load(Ordering::Relaxed), 1);
{
let page_cache = cache_ref.cache.read();
assert!(
!page_cache.page_fetches.contains_key(&(blob_id, 0)),
"erroring fetch should leave no stale page_fetches entry"
);
}
let mut cached = vec![0u8; PAGE_SIZE.get() as usize];
assert_eq!(cache_ref.read_cached(blob_id, &mut cached, 0), 0);
let mut third_buf = vec![0u8; PAGE_SIZE.get() as usize];
assert!(matches!(
cache_ref.read(&blob, blob_id, &mut third_buf, 0).await,
Err(Error::ReadFailed)
));
assert_eq!(reads.load(Ordering::Relaxed), 2);
});
}
}