Skip to main content

commonware_runtime/utils/buffer/paged/
cache.rs

1//! A page cache for caching _logical_ pages of [Blob] data in memory. The cache is unaware of the
2//! physical page format used by the blob, which is left to the blob implementation.
3
4use super::get_page_from_blob;
5use crate::{Blob, BufferPool, BufferPooler, Error, IoBuf, IoBufMut};
6use commonware_utils::sync::RwLock;
7use futures::{future::Shared, FutureExt};
8use std::{
9    collections::{hash_map::Entry, HashMap},
10    future::Future,
11    num::{NonZeroU16, NonZeroUsize},
12    pin::Pin,
13    sync::{
14        atomic::{AtomicBool, AtomicU64, Ordering},
15        Arc,
16    },
17};
18use tracing::{debug, error, trace};
19
20/// Shared future for one logical page fetch. The output uses `Arc<Error>` because `Shared`
21/// requires cloneable results. The `IoBuf` contains only the logical, validated page bytes.
22type PageFetchFuture = Shared<Pin<Box<dyn Future<Output = Result<IoBuf, Arc<Error>>> + Send>>>;
23
24/// Shared handle to one in-flight fetch generation. The cache keeps one copy in `page_fetches`,
25/// and each waiter clones the `Arc` while it is still interested in the result.
26type PageFetch = Arc<PageFetchFuture>;
27
28/// One in-flight fetch generation for a single `(blob_id, page_num)`.
29///
30/// `fetch` is shared by every waiter that joined this generation. `waiters` counts the still
31/// armed waiters whose drop path may need to remove this entry if they become the last
32/// unresolved waiter. If `page_fetches[key]` is later replaced by a newer generation, stale
33/// waiters from the old generation must ignore it and rely on `Arc::ptr_eq` against their saved
34/// `fetch`.
35struct PageFetchEntry {
36    /// Shared page fetch future that reads and validates the logical page exactly once.
37    fetch: PageFetch,
38    /// Count of waiters that still need cancellation cleanup for this fetch generation.
39    waiters: usize,
40}
41
42/// Removes a stale in-flight page fetch when the last unresolved waiter is dropped.
43struct PageFetchGuard {
44    cache: Arc<RwLock<Cache>>,
45    key: (u64, u64),
46    fetch: PageFetch,
47    armed: bool,
48}
49
50impl PageFetchGuard {
51    const fn new(cache: Arc<RwLock<Cache>>, key: (u64, u64), fetch: PageFetch) -> Self {
52        Self {
53            cache,
54            key,
55            fetch,
56            armed: true,
57        }
58    }
59
60    const fn disarm(&mut self) {
61        self.armed = false;
62    }
63}
64
65impl Drop for PageFetchGuard {
66    fn drop(&mut self) {
67        if !self.armed {
68            return;
69        }
70
71        // A resolved fetch removes `page_fetches[key]` before waiters resume and disarm their
72        // guards. If that fetch failed, the page remains uncached, so a new reader can install a
73        // new fetch for the same key before an old waiter is cancelled. Ignore drops from stale
74        // waiters so they cannot decrement or remove a newer generation. A surviving waiter keeps
75        // the current generation installed, which lets the shared future finish and cache the page
76        // on success.
77        let mut cache = self.cache.write();
78        let Entry::Occupied(mut current) = cache.page_fetches.entry(self.key) else {
79            return;
80        };
81        if !Arc::ptr_eq(&current.get().fetch, &self.fetch) {
82            return;
83        }
84        if current.get().waiters == 1 {
85            current.remove();
86        } else {
87            current.get_mut().waiters -= 1;
88        }
89    }
90}
91
92/// A [Cache] caches pages of [Blob] data in memory after verifying the integrity of each.
93///
94/// A single page cache can be used to cache data from multiple blobs by assigning a unique id to
95/// each.
96///
97/// Implements the [Clock](https://en.wikipedia.org/wiki/Page_replacement_algorithm#Clock)
98/// replacement policy, which is a lightweight approximation of LRU. The page `cache` is a circular
99/// list of recently accessed pages, and `clock` is the index of the next page within it to examine
100/// for replacement. When a page needs to be evicted, we start the search at `clock` within `cache`,
101/// searching for the first page with a false reference bit, and setting any skipped page's
102/// reference bit to false along the way.
103struct Cache {
104    /// The page cache index, with a key composed of (blob id, page number), that maps each cached
105    /// page to the index of its slot in `entries` and `slots`.
106    ///
107    /// # Invariants
108    ///
109    /// Each `index` entry maps to exactly one `entries` slot, and that entry always has a
110    /// matching key.
111    index: HashMap<(u64, u64), usize>,
112
113    /// Metadata for each cache slot.
114    ///
115    /// Each `entries` slot has exactly one corresponding `index` entry.
116    entries: Vec<CacheEntry>,
117
118    /// Per-slot page buffers allocated from the pool.
119    ///
120    /// `slots[i]` stores one logical page for `entries[i]`.
121    slots: Vec<IoBufMut>,
122
123    /// Size of each page in bytes.
124    page_size: usize,
125
126    /// The Clock replacement policy's clock hand index into `entries`.
127    clock: usize,
128
129    /// The maximum number of pages that will be cached.
130    capacity: usize,
131
132    /// A map of currently executing page fetches to ensure only one task at a time is trying to
133    /// fetch a specific page.
134    page_fetches: HashMap<(u64, u64), PageFetchEntry>,
135}
136
137/// Metadata for a single cache entry (page data stored in per-slot buffers).
138struct CacheEntry {
139    /// The cache key which is composed of the blob id and page number of the page.
140    key: (u64, u64),
141
142    /// A bit indicating whether this page was recently referenced.
143    referenced: AtomicBool,
144}
145
146/// A reference to a page cache that can be shared across threads via cloning, along with the page
147/// size that will be used with it. Provides the API for interacting with the page cache in a
148/// thread-safe manner.
149#[derive(Clone)]
150pub struct CacheRef {
151    /// The size of each page in the underlying blobs managed by this page cache.
152    ///
153    /// # Warning
154    ///
155    /// You cannot change the page size once data has been written without invalidating it. (Reads
156    /// on blobs that were written with a different page size will fail their integrity check.)
157    page_size: u64,
158
159    /// The next id to assign to a blob that will be managed by this cache.
160    next_id: Arc<AtomicU64>,
161
162    /// Shareable reference to the page cache.
163    cache: Arc<RwLock<Cache>>,
164
165    /// Pool used for page-cache and associated buffer allocations.
166    pool: BufferPool,
167}
168
169impl CacheRef {
170    /// Create a shared page-cache handle backed by `pool`.
171    ///
172    /// The cache stores at most `capacity` pages, each exactly `page_size` bytes.
173    /// Initialization eagerly allocates and zeroes all cache slots from `pool`.
174    pub fn new(pool: BufferPool, page_size: NonZeroU16, capacity: NonZeroUsize) -> Self {
175        let page_size_u64 = page_size.get() as u64;
176
177        Self {
178            page_size: page_size_u64,
179            next_id: Arc::new(AtomicU64::new(0)),
180            cache: Arc::new(RwLock::new(Cache::new(pool.clone(), page_size, capacity))),
181            pool,
182        }
183    }
184
185    /// Create a shared page-cache handle, extracting the storage [BufferPool] from a
186    /// [BufferPooler].
187    pub fn from_pooler(
188        pooler: &impl BufferPooler,
189        page_size: NonZeroU16,
190        capacity: NonZeroUsize,
191    ) -> Self {
192        Self::new(pooler.storage_buffer_pool().clone(), page_size, capacity)
193    }
194
195    /// The page size used by this page cache.
196    #[inline]
197    pub const fn page_size(&self) -> u64 {
198        self.page_size
199    }
200
201    /// Returns the storage buffer pool associated with this cache.
202    #[inline]
203    pub const fn pool(&self) -> &BufferPool {
204        &self.pool
205    }
206
207    /// Returns a unique id for the next blob that will use this page cache.
208    pub fn next_id(&self) -> u64 {
209        self.next_id.fetch_add(1, Ordering::Relaxed)
210    }
211
212    /// Convert a logical offset into the number of the page it belongs to and the offset within
213    /// that page.
214    pub const fn offset_to_page(&self, offset: u64) -> (u64, u64) {
215        Cache::offset_to_page(self.page_size, offset)
216    }
217
218    /// Try to read the specified bytes from the page cache only. Returns the number of bytes
219    /// successfully read from cache and copied to `buf` before a page fault, if any.
220    pub(super) fn read_cached(
221        &self,
222        blob_id: u64,
223        mut buf: &mut [u8],
224        mut logical_offset: u64,
225    ) -> usize {
226        let original_len = buf.len();
227        let page_cache = self.cache.read();
228        while !buf.is_empty() {
229            let count = page_cache.read_at(blob_id, buf, logical_offset);
230            if count == 0 {
231                // Cache miss - return how many bytes we successfully read
232                break;
233            }
234            logical_offset += count as u64;
235            buf = &mut buf[count..];
236        }
237        original_len - buf.len()
238    }
239
240    /// Read the specified bytes, preferentially from the page cache. Bytes not found in the cache
241    /// will be read from the provided `blob` and cached for future reads.
242    pub(super) async fn read<B: Blob>(
243        &self,
244        blob: &B,
245        blob_id: u64,
246        mut buf: &mut [u8],
247        mut offset: u64,
248    ) -> Result<(), Error> {
249        // Read up to a page worth of data at a time from either the page cache or the `blob`,
250        // until the requested data is fully read.
251        while !buf.is_empty() {
252            // Read lock the page cache and see if we can get (some of) the data from it.
253            {
254                let page_cache = self.cache.read();
255                let count = page_cache.read_at(blob_id, buf, offset);
256                if count != 0 {
257                    offset += count as u64;
258                    buf = &mut buf[count..];
259                    continue;
260                }
261            }
262
263            // Handle page fault.
264            let count = self
265                .read_after_page_fault(blob, blob_id, buf, offset)
266                .await?;
267            offset += count as u64;
268            buf = &mut buf[count..];
269        }
270
271        Ok(())
272    }
273
274    /// Fetch the requested page after encountering a page fault, which may involve retrieving it
275    /// from `blob` & caching the result in the page cache. Returns the number of bytes read, which
276    /// should always be non-zero.
277    pub(super) async fn read_after_page_fault<B: Blob>(
278        &self,
279        blob: &B,
280        blob_id: u64,
281        buf: &mut [u8],
282        offset: u64,
283    ) -> Result<usize, Error> {
284        assert!(!buf.is_empty());
285
286        let (page_num, offset_in_page) = Cache::offset_to_page(self.page_size, offset);
287        let offset_in_page = offset_in_page as usize;
288        trace!(page_num, blob_id, "page fault");
289
290        // Create or clone a future that retrieves the desired page from the underlying blob. This
291        // requires a write lock on the page cache since we may need to modify `page_fetches` if
292        // this task is the first fetcher.
293        let (fetch_future, mut fetch_guard) = {
294            let mut cache = self.cache.write();
295
296            // There's a (small) chance the page was fetched & buffered by another task before we
297            // were able to acquire the write lock, so check the cache before doing anything else.
298            let count = cache.read_at(blob_id, buf, offset);
299            if count != 0 {
300                return Ok(count);
301            }
302
303            let key = (blob_id, page_num);
304            match cache.page_fetches.entry(key) {
305                Entry::Occupied(o) => {
306                    // Another thread is already fetching this page, so clone its existing future.
307                    let entry = o.into_mut();
308                    entry.waiters += 1;
309                    let fetch_future = entry.fetch.as_ref().clone();
310                    let fetch = Arc::clone(&entry.fetch);
311                    (
312                        fetch_future,
313                        PageFetchGuard::new(Arc::clone(&self.cache), key, fetch),
314                    )
315                }
316                Entry::Vacant(v) => {
317                    // Nobody is currently fetching this page, so create a future that will do the
318                    // work. get_page_from_blob handles CRC validation and returns only logical bytes.
319                    let blob = blob.clone();
320                    let cache = Arc::clone(&self.cache);
321                    let page_size = self.page_size;
322                    let future = async move {
323                        let result = fetch_cacheable_page(&blob, page_num, page_size).await;
324                        if let Err(err) = &result {
325                            error!(page_num, ?err, "Page fetch failed");
326                        }
327
328                        // This shared future still owns `page_fetches[key]`. As long as at least
329                        // one waiter remains armed, that entry pins this generation in place, so a
330                        // replacement fetch for the same page cannot be inserted before we cache
331                        // the successful result below. Only when every waiter cancels can the last
332                        // guard remove the entry and let a later reader start a new generation.
333                        let mut cache = cache.write();
334                        if let Ok(page) = &result {
335                            cache.cache(blob_id, page.as_ref(), page_num);
336                        }
337                        let _ = cache.page_fetches.remove(&key);
338                        result
339                    };
340
341                    // Make the future shareable and insert it into the map.
342                    let fetch_future = future.boxed().shared();
343                    let fetch = Arc::new(fetch_future.clone());
344                    v.insert(PageFetchEntry {
345                        fetch: Arc::clone(&fetch),
346                        waiters: 1,
347                    });
348
349                    (
350                        fetch_future,
351                        PageFetchGuard::new(Arc::clone(&self.cache), key, fetch),
352                    )
353                }
354            }
355        };
356
357        // Await the shared fetch. The future itself logs failures, caches the resolved page, and
358        // removes the in-flight marker before it returns, so waiters only need cancellation
359        // cleanup while the fetch is still unresolved.
360        let fetch_result = fetch_future.await;
361        fetch_guard.disarm();
362        let page_buf = match fetch_result {
363            Ok(page_buf) => page_buf,
364            Err(_) => return Err(Error::ReadFailed),
365        };
366
367        // Copy the requested portion of the page into the buffer.
368        let bytes_to_copy = std::cmp::min(buf.len(), page_buf.len() - offset_in_page);
369        buf[..bytes_to_copy]
370            .copy_from_slice(&page_buf.as_ref()[offset_in_page..offset_in_page + bytes_to_copy]);
371
372        Ok(bytes_to_copy)
373    }
374
375    /// Cache the provided pages of data in the page cache, returning the remaining bytes that
376    /// didn't fill a whole page. `offset` must be page aligned.
377    ///
378    /// # Panics
379    ///
380    /// - Panics if `offset` is not page aligned.
381    /// - If the buffer is not the size of a page.
382    pub fn cache(&self, blob_id: u64, mut buf: &[u8], offset: u64) -> usize {
383        let (mut page_num, offset_in_page) = self.offset_to_page(offset);
384        assert_eq!(offset_in_page, 0);
385        {
386            // Write lock the page cache.
387            let page_size = self.page_size as usize;
388            let mut page_cache = self.cache.write();
389            while buf.len() >= page_size {
390                page_cache.cache(blob_id, &buf[..page_size], page_num);
391                buf = &buf[page_size..];
392                page_num = match page_num.checked_add(1) {
393                    Some(next) => next,
394                    None => break,
395                };
396            }
397        }
398
399        buf.len()
400    }
401}
402
403impl Cache {
404    /// Return a new empty page cache with an initial next-blob id of 0, and a max cache capacity
405    /// of `capacity` pages, each of size `page_size` bytes.
406    pub fn new(pool: BufferPool, page_size: NonZeroU16, capacity: NonZeroUsize) -> Self {
407        let page_size = page_size.get() as usize;
408        let capacity = capacity.get();
409        let mut slots = Vec::with_capacity(capacity);
410        for _ in 0..capacity {
411            let slot = pool.alloc_zeroed(page_size);
412            slots.push(slot);
413        }
414        Self {
415            index: HashMap::new(),
416            entries: Vec::with_capacity(capacity),
417            slots,
418            page_size,
419            clock: 0,
420            capacity,
421            page_fetches: HashMap::new(),
422        }
423    }
424
425    /// Returns a slice to the page data for the given slot index.
426    #[inline]
427    fn page_slice(&self, slot: usize) -> &[u8] {
428        assert!(slot < self.capacity);
429        self.slots[slot].as_ref()
430    }
431
432    /// Returns a mutable slice to the page data for the given slot index.
433    #[inline]
434    fn page_slice_mut(&mut self, slot: usize) -> &mut [u8] {
435        assert!(slot < self.capacity);
436        self.slots[slot].as_mut()
437    }
438
439    /// Convert an offset into the number of the page it belongs to and the offset within that page.
440    const fn offset_to_page(page_size: u64, offset: u64) -> (u64, u64) {
441        (offset / page_size, offset % page_size)
442    }
443
444    /// Attempt to fetch blob data starting at `offset` from the page cache. Returns the number of
445    /// bytes read, which could be 0 if the first page in the requested range isn't buffered, and is
446    /// never more than `self.page_size` or the length of `buf`. The returned bytes won't cross a
447    /// page boundary, so multiple reads may be required even if all data in the desired range is
448    /// buffered.
449    fn read_at(&self, blob_id: u64, buf: &mut [u8], logical_offset: u64) -> usize {
450        let (page_num, offset_in_page) =
451            Self::offset_to_page(self.page_size as u64, logical_offset);
452        let Some(&slot) = self.index.get(&(blob_id, page_num)) else {
453            return 0;
454        };
455        let entry = &self.entries[slot];
456        assert_eq!(entry.key, (blob_id, page_num));
457        entry.referenced.store(true, Ordering::Relaxed);
458
459        let page = self.page_slice(slot);
460        let bytes_to_copy = std::cmp::min(buf.len(), self.page_size - offset_in_page as usize);
461        buf[..bytes_to_copy].copy_from_slice(
462            &page[offset_in_page as usize..offset_in_page as usize + bytes_to_copy],
463        );
464
465        bytes_to_copy
466    }
467
468    /// Put the given `page` into the page cache.
469    fn cache(&mut self, blob_id: u64, page: &[u8], page_num: u64) {
470        assert_eq!(page.len(), self.page_size);
471        let key = (blob_id, page_num);
472
473        // Check for existing entry (update case)
474        if let Some(&slot) = self.index.get(&key) {
475            // This case can result when a blob is truncated across a page boundary, and later grows
476            // back to (beyond) its original size. It will also become expected behavior once we
477            // allow cached pages to be writable.
478            debug!(blob_id, page_num, "updating duplicate page");
479
480            // Update the stale data with the new page.
481            let entry = &self.entries[slot];
482            assert_eq!(entry.key, key);
483            entry.referenced.store(true, Ordering::Relaxed);
484            self.page_slice_mut(slot).copy_from_slice(page);
485            return;
486        }
487
488        // New entry - check if we need to evict
489        if self.entries.len() < self.capacity {
490            // Still growing: use next available slot
491            let slot = self.entries.len();
492            self.index.insert(key, slot);
493            self.entries.push(CacheEntry {
494                key,
495                referenced: AtomicBool::new(true),
496            });
497            self.page_slice_mut(slot).copy_from_slice(page);
498            return;
499        }
500
501        // Cache full: find slot to evict using Clock algorithm
502        while self.entries[self.clock].referenced.load(Ordering::Relaxed) {
503            self.entries[self.clock]
504                .referenced
505                .store(false, Ordering::Relaxed);
506            self.clock = (self.clock + 1) % self.entries.len();
507        }
508
509        // Evict and replace
510        let slot = self.clock;
511        let entry = &mut self.entries[slot];
512        assert!(self.index.remove(&entry.key).is_some());
513        self.index.insert(key, slot);
514        entry.key = key;
515        entry.referenced.store(true, Ordering::Relaxed);
516        self.page_slice_mut(slot).copy_from_slice(page);
517
518        // Move the clock forward.
519        self.clock = (self.clock + 1) % self.entries.len();
520    }
521}
522
523/// Fetch one logical page for insertion into the page cache, rejecting partial pages because cache
524/// entries must always contain a full logical page.
525async fn fetch_cacheable_page(
526    blob: &impl Blob,
527    page_num: u64,
528    page_size: u64,
529) -> Result<IoBuf, Arc<Error>> {
530    let page = get_page_from_blob(blob, page_num, page_size)
531        .await
532        .map_err(Arc::new)?;
533
534    // We should never be fetching partial pages through the page cache. This can happen if a
535    // non-last page is corrupted and falls back to a partial CRC.
536    let len = page.len();
537    if len != page_size as usize {
538        error!(
539            page_num,
540            expected = page_size,
541            actual = len,
542            "attempted to fetch partial page from blob"
543        );
544        return Err(Arc::new(Error::InvalidChecksum));
545    }
546
547    Ok(page)
548}
549
550#[cfg(test)]
551mod tests {
552    use super::{super::Checksum, *};
553    use crate::{
554        buffer::paged::CHECKSUM_SIZE, deterministic, BufferPool, BufferPoolConfig, Clock as _,
555        IoBufsMut, Runner as _, Spawner as _, Storage as _,
556    };
557    use commonware_cryptography::Crc32;
558    use commonware_macros::test_traced;
559    use commonware_utils::{channel::oneshot, sync::Mutex, NZUsize, NZU16};
560    use futures::future::pending;
561    use prometheus_client::registry::Registry;
562    use std::{
563        num::NonZeroU16,
564        sync::{
565            atomic::{AtomicUsize, Ordering},
566            Arc,
567        },
568        time::Duration,
569    };
570
571    // Logical page size (what CacheRef uses and what gets cached).
572    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
573    const PAGE_SIZE_U64: u64 = PAGE_SIZE.get() as u64;
574
575    /// A blob that signals once a read starts and then never returns.
576    #[derive(Clone)]
577    struct BlockingBlob {
578        started: Arc<Mutex<Option<oneshot::Sender<()>>>>,
579    }
580
581    impl Blob for BlockingBlob {
582        async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
583            self.read_at_buf(offset, len, IoBufsMut::default()).await
584        }
585
586        async fn read_at_buf(
587            &self,
588            _offset: u64,
589            _len: usize,
590            _bufs: impl Into<IoBufsMut> + Send,
591        ) -> Result<IoBufsMut, Error> {
592            let sender = self
593                .started
594                .lock()
595                .take()
596                .expect("blocking blob read started more than once");
597            let _ = sender.send(());
598            pending::<()>().await;
599            unreachable!()
600        }
601
602        async fn write_at(
603            &self,
604            _offset: u64,
605            _bufs: impl Into<crate::IoBufs> + Send,
606        ) -> Result<(), Error> {
607            Ok(())
608        }
609
610        async fn resize(&self, _len: u64) -> Result<(), Error> {
611            Ok(())
612        }
613
614        async fn sync(&self) -> Result<(), Error> {
615            Ok(())
616        }
617    }
618
619    #[derive(Clone)]
620    enum ControlledBlobResult {
621        Success(Arc<Vec<u8>>),
622        Error,
623    }
624
625    /// A blob that blocks its first physical page read until released and counts total reads.
626    #[derive(Clone)]
627    struct ControlledBlob {
628        started: Arc<Mutex<Option<oneshot::Sender<()>>>>,
629        release: Arc<Mutex<Option<oneshot::Receiver<()>>>>,
630        reads: Arc<AtomicUsize>,
631        result: ControlledBlobResult,
632    }
633
634    impl Blob for ControlledBlob {
635        async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
636            self.read_at_buf(offset, len, IoBufsMut::default()).await
637        }
638
639        async fn read_at_buf(
640            &self,
641            _offset: u64,
642            _len: usize,
643            _bufs: impl Into<IoBufsMut> + Send,
644        ) -> Result<IoBufsMut, Error> {
645            if self.reads.fetch_add(1, Ordering::Relaxed) == 0 {
646                let sender = self
647                    .started
648                    .lock()
649                    .take()
650                    .expect("controlled blob start signal consumed more than once");
651                let _ = sender.send(());
652
653                let release = self
654                    .release
655                    .lock()
656                    .take()
657                    .expect("controlled blob release receiver consumed more than once");
658                release.await.expect("release signal dropped");
659            }
660
661            match &self.result {
662                ControlledBlobResult::Success(page) => Ok(IoBufsMut::from(page.as_ref().clone())),
663                ControlledBlobResult::Error => Err(Error::ReadFailed),
664            }
665        }
666
667        async fn write_at(
668            &self,
669            _offset: u64,
670            _bufs: impl Into<crate::IoBufs> + Send,
671        ) -> Result<(), Error> {
672            Ok(())
673        }
674
675        async fn resize(&self, _len: u64) -> Result<(), Error> {
676            Ok(())
677        }
678
679        async fn sync(&self) -> Result<(), Error> {
680            Ok(())
681        }
682    }
683
684    #[test_traced]
685    fn test_cache_basic() {
686        let mut registry = Registry::default();
687        let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
688        let mut cache: Cache = Cache::new(pool, PAGE_SIZE, NZUsize!(10));
689
690        // Cache stores logical-sized pages.
691        let mut buf = vec![0; PAGE_SIZE.get() as usize];
692        let bytes_read = cache.read_at(0, &mut buf, 0);
693        assert_eq!(bytes_read, 0);
694
695        cache.cache(0, &[1; PAGE_SIZE.get() as usize], 0);
696        let bytes_read = cache.read_at(0, &mut buf, 0);
697        assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
698        assert_eq!(buf, [1; PAGE_SIZE.get() as usize]);
699
700        // Test replacement -- should log a duplicate page warning but still work.
701        cache.cache(0, &[2; PAGE_SIZE.get() as usize], 0);
702        let bytes_read = cache.read_at(0, &mut buf, 0);
703        assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
704        assert_eq!(buf, [2; PAGE_SIZE.get() as usize]);
705
706        // Test exceeding the cache capacity.
707        for i in 0u64..11 {
708            cache.cache(0, &[i as u8; PAGE_SIZE.get() as usize], i);
709        }
710        // Page 0 should have been evicted.
711        let bytes_read = cache.read_at(0, &mut buf, 0);
712        assert_eq!(bytes_read, 0);
713        // Page 1-10 should be in the cache.
714        for i in 1u64..11 {
715            let bytes_read = cache.read_at(0, &mut buf, i * PAGE_SIZE_U64);
716            assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
717            assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
718        }
719
720        // Test reading from an unaligned offset by adding 2 to an aligned offset. The read
721        // should be 2 bytes short of a full logical page.
722        let mut buf = vec![0; PAGE_SIZE.get() as usize];
723        let bytes_read = cache.read_at(0, &mut buf, PAGE_SIZE_U64 + 2);
724        assert_eq!(bytes_read, PAGE_SIZE.get() as usize - 2);
725        assert_eq!(
726            &buf[..PAGE_SIZE.get() as usize - 2],
727            [1; PAGE_SIZE.get() as usize - 2]
728        );
729    }
730
731    #[test_traced]
732    fn test_cache_read_with_blob() {
733        // Initialize the deterministic context
734        let executor = deterministic::Runner::default();
735        // Start the test within the executor
736        executor.start(|context| async move {
737            // Physical page size = logical + CRC record.
738            let physical_page_size = PAGE_SIZE_U64 + CHECKSUM_SIZE;
739
740            // Populate a blob with 11 consecutive pages of CRC-protected data.
741            let (blob, size) = context
742                .open("test", "blob".as_bytes())
743                .await
744                .expect("Failed to open blob");
745            assert_eq!(size, 0);
746            for i in 0..11 {
747                // Write logical data followed by Checksum.
748                let logical_data = vec![i as u8; PAGE_SIZE.get() as usize];
749                let crc = Crc32::checksum(&logical_data);
750                let record = Checksum::new(PAGE_SIZE.get(), crc);
751                let mut page_data = logical_data;
752                page_data.extend_from_slice(&record.to_bytes());
753                blob.write_at(i * physical_page_size, page_data)
754                    .await
755                    .unwrap();
756            }
757
758            // Fill the page cache with the blob's data via CacheRef::read.
759            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
760            assert_eq!(cache_ref.next_id(), 0);
761            assert_eq!(cache_ref.next_id(), 1);
762            for i in 0..11 {
763                // Read expects logical bytes only (CRCs are stripped).
764                let mut buf = vec![0; PAGE_SIZE.get() as usize];
765                cache_ref
766                    .read(&blob, 0, &mut buf, i * PAGE_SIZE_U64)
767                    .await
768                    .unwrap();
769                assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
770            }
771
772            // Repeat the read to exercise reading from the page cache. Must start at 1 because
773            // page 0 should be evicted.
774            for i in 1..11 {
775                let mut buf = vec![0; PAGE_SIZE.get() as usize];
776                cache_ref
777                    .read(&blob, 0, &mut buf, i * PAGE_SIZE_U64)
778                    .await
779                    .unwrap();
780                assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
781            }
782
783            // Cleanup.
784            blob.sync().await.unwrap();
785        });
786    }
787
788    #[test_traced]
789    fn test_cache_max_page() {
790        let executor = deterministic::Runner::default();
791        executor.start(|context| async move {
792            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(2));
793
794            // Use the largest page-aligned offset representable for the configured PAGE_SIZE.
795            let aligned_max_offset = u64::MAX - (u64::MAX % PAGE_SIZE_U64);
796
797            // CacheRef::cache expects only logical bytes (no CRC).
798            let logical_data = vec![42u8; PAGE_SIZE.get() as usize];
799
800            // Caching exactly one page at the maximum offset should succeed.
801            let remaining = cache_ref.cache(0, logical_data.as_slice(), aligned_max_offset);
802            assert_eq!(remaining, 0);
803
804            // Reading from the cache should return the logical bytes.
805            let mut buf = vec![0u8; PAGE_SIZE.get() as usize];
806            let page_cache = cache_ref.cache.read();
807            let bytes_read = page_cache.read_at(0, &mut buf, aligned_max_offset);
808            assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
809            assert!(buf.iter().all(|b| *b == 42));
810        });
811    }
812
813    #[test_traced]
814    fn test_cache_at_high_offset() {
815        let executor = deterministic::Runner::default();
816        executor.start(|context| async move {
817            // Use the minimum page size (CHECKSUM_SIZE + 1 = 13) with high offset.
818            const MIN_PAGE_SIZE: u64 = CHECKSUM_SIZE + 1;
819            let cache_ref =
820                CacheRef::from_pooler(&context, NZU16!(MIN_PAGE_SIZE as u16), NZUsize!(2));
821
822            // Create two pages worth of logical data (no CRCs - CacheRef::cache expects logical
823            // only).
824            let data = vec![1u8; MIN_PAGE_SIZE as usize * 2];
825
826            // Cache pages at a high (but not max) aligned offset so we can verify both pages.
827            // Use an offset that's a few pages below max to avoid overflow when verifying.
828            let aligned_max_offset = u64::MAX - (u64::MAX % MIN_PAGE_SIZE);
829            let high_offset = aligned_max_offset - (MIN_PAGE_SIZE * 2);
830            let remaining = cache_ref.cache(0, &data, high_offset);
831            // Both pages should be cached.
832            assert_eq!(remaining, 0);
833
834            // Verify the first page was cached correctly.
835            let mut buf = vec![0u8; MIN_PAGE_SIZE as usize];
836            let page_cache = cache_ref.cache.read();
837            assert_eq!(
838                page_cache.read_at(0, &mut buf, high_offset),
839                MIN_PAGE_SIZE as usize
840            );
841            assert!(buf.iter().all(|b| *b == 1));
842
843            // Verify the second page was cached correctly.
844            assert_eq!(
845                page_cache.read_at(0, &mut buf, high_offset + MIN_PAGE_SIZE),
846                MIN_PAGE_SIZE as usize
847            );
848            assert!(buf.iter().all(|b| *b == 1));
849        });
850    }
851
852    #[test_traced]
853    fn test_page_fetches_entry_removed_when_first_fetcher_cancelled() {
854        let executor = deterministic::Runner::default();
855        executor.start(|context| async move {
856            // Set up a small cache and a blob whose read never completes once started.
857            let blob_id = 0;
858            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
859            let (started_tx, started_rx) = oneshot::channel();
860            let blob = BlockingBlob {
861                started: Arc::new(Mutex::new(Some(started_tx))),
862            };
863            let mut read_buf = vec![0u8; PAGE_SIZE.get() as usize];
864
865            // Spawn the first fetcher. It will insert into `page_fetches` and then block forever.
866            let cache_ref_for_task = cache_ref.clone();
867            let blob_for_task = blob.clone();
868            let handle = context.spawn(move |_| async move {
869                let _ = cache_ref_for_task
870                    .read(&blob_for_task, blob_id, &mut read_buf, 0)
871                    .await;
872            });
873
874            // Wait until the underlying read has started, ensuring the in-flight marker exists.
875            started_rx.await.expect("blocking read never started");
876            {
877                let page_cache = cache_ref.cache.read();
878                assert!(page_cache.page_fetches.contains_key(&(blob_id, 0)));
879            }
880
881            // Cancel the first fetcher before it reaches explicit cleanup.
882            handle.abort();
883            assert!(matches!(handle.await, Err(Error::Closed)));
884
885            // The guard drop path should have removed the stale in-flight entry.
886            let page_cache = cache_ref.cache.read();
887            assert!(
888                !page_cache.page_fetches.contains_key(&(blob_id, 0)),
889                "cancelled first fetcher should not leave stale page_fetches entry"
890            );
891        });
892    }
893
894    #[test_traced]
895    fn test_followers_keep_single_flight_after_first_fetcher_cancellation() {
896        let executor = deterministic::Runner::default();
897        executor.start(|context| async move {
898            let blob_id = 0;
899            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
900
901            // Return one valid full page, but hold the underlying read until the test releases it.
902            let logical_page = vec![7u8; PAGE_SIZE.get() as usize];
903            let crc = Crc32::checksum(&logical_page);
904            let mut physical_page = logical_page.clone();
905            physical_page.extend_from_slice(&Checksum::new(PAGE_SIZE.get(), crc).to_bytes());
906            let (started_tx, started_rx) = oneshot::channel();
907            let (release_tx, release_rx) = oneshot::channel();
908            let reads = Arc::new(AtomicUsize::new(0));
909            let blob = ControlledBlob {
910                started: Arc::new(Mutex::new(Some(started_tx))),
911                release: Arc::new(Mutex::new(Some(release_rx))),
912                reads: reads.clone(),
913                result: ControlledBlobResult::Success(Arc::new(physical_page)),
914            };
915
916            // Start the fetch that installs the shared in-flight entry.
917            let mut first_buf = vec![0u8; PAGE_SIZE.get() as usize];
918            let cache_ref_for_first = cache_ref.clone();
919            let blob_for_first = blob.clone();
920            let first = context.clone().spawn(move |_| async move {
921                let _ = cache_ref_for_first
922                    .read(&blob_for_first, blob_id, &mut first_buf, 0)
923                    .await;
924            });
925            started_rx.await.expect("first read never started");
926
927            // Join as a follower while the first fetch is still blocked in the blob.
928            let mut second_buf = vec![0u8; PAGE_SIZE.get() as usize];
929            let cache_ref_for_second = cache_ref.clone();
930            let blob_for_second = blob.clone();
931            let second = context.clone().spawn(move |_| async move {
932                cache_ref_for_second
933                    .read(&blob_for_second, blob_id, &mut second_buf, 0)
934                    .await
935                    .expect("second read failed");
936                second_buf
937            });
938
939            // Wait until both tasks are registered against the same in-flight fetch.
940            loop {
941                let joined = {
942                    let page_cache = cache_ref.cache.read();
943                    page_cache
944                        .page_fetches
945                        .get(&(blob_id, 0))
946                        .map(|fetch| fetch.waiters == 2)
947                        .unwrap_or(false)
948                };
949                if joined {
950                    break;
951                }
952                context.sleep(Duration::from_millis(1)).await;
953            }
954
955            // Cancel the original fetcher; the follower should keep the generation alive.
956            first.abort();
957            assert!(matches!(first.await, Err(Error::Closed)));
958
959            // A later reader should still join the existing in-flight fetch instead of starting a
960            // second blob read.
961            let mut third_buf = vec![0u8; PAGE_SIZE.get() as usize];
962            let cache_ref_for_third = cache_ref.clone();
963            let blob_for_third = blob.clone();
964            let third = context.clone().spawn(move |_| async move {
965                cache_ref_for_third
966                    .read(&blob_for_third, blob_id, &mut third_buf, 0)
967                    .await
968                    .expect("third read failed");
969                third_buf
970            });
971
972            // Either the third reader bumps the waiter count back to 2, or a bug starts a second
973            // blob read.
974            loop {
975                let third_entered = {
976                    let page_cache = cache_ref.cache.read();
977                    reads.load(Ordering::Relaxed) > 1
978                        || page_cache
979                            .page_fetches
980                            .get(&(blob_id, 0))
981                            .map(|fetch| fetch.waiters == 2)
982                            .unwrap_or(false)
983                };
984                if third_entered {
985                    break;
986                }
987                context.sleep(Duration::from_millis(1)).await;
988            }
989
990            // Let the single underlying fetch complete and satisfy both surviving waiters.
991            let _ = release_tx.send(());
992            let second_buf = second.await.expect("second task failed");
993            let third_buf = third.await.expect("third task failed");
994            assert_eq!(second_buf, logical_page);
995            assert_eq!(third_buf, logical_page);
996
997            // All waiters should have shared the same blob read.
998            assert_eq!(reads.load(Ordering::Relaxed), 1);
999
1000            // The successful fetch should populate the cache for later readers.
1001            let mut cached = vec![0u8; PAGE_SIZE.get() as usize];
1002            assert_eq!(
1003                cache_ref.read_cached(blob_id, &mut cached, 0),
1004                PAGE_SIZE.get() as usize
1005            );
1006            assert_eq!(cached, logical_page);
1007
1008            // A later read should hit the cached page and avoid touching the blob again.
1009            let mut fourth_buf = vec![0u8; PAGE_SIZE.get() as usize];
1010            cache_ref
1011                .read(&blob, blob_id, &mut fourth_buf, 0)
1012                .await
1013                .unwrap();
1014            assert_eq!(fourth_buf, logical_page);
1015            assert_eq!(reads.load(Ordering::Relaxed), 1);
1016
1017            let page_cache = cache_ref.cache.read();
1018            assert!(
1019                !page_cache.page_fetches.contains_key(&(blob_id, 0)),
1020                "completed fetch should leave no stale page_fetches entry"
1021            );
1022        });
1023    }
1024
1025    #[test_traced]
1026    fn test_page_fetch_error_removes_entry_for_all_waiters() {
1027        let executor = deterministic::Runner::default();
1028        executor.start(|context| async move {
1029            let blob_id = 0;
1030            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
1031
1032            // Hold one shared fetch in flight, then make the underlying read fail.
1033            let (started_tx, started_rx) = oneshot::channel();
1034            let (release_tx, release_rx) = oneshot::channel();
1035            let reads = Arc::new(AtomicUsize::new(0));
1036            let blob = ControlledBlob {
1037                started: Arc::new(Mutex::new(Some(started_tx))),
1038                release: Arc::new(Mutex::new(Some(release_rx))),
1039                reads: reads.clone(),
1040                result: ControlledBlobResult::Error,
1041            };
1042
1043            // Start the fetch that creates the in-flight entry.
1044            let mut first_buf = vec![0u8; PAGE_SIZE.get() as usize];
1045            let cache_ref_for_first = cache_ref.clone();
1046            let blob_for_first = blob.clone();
1047            let first = context.clone().spawn(move |_| async move {
1048                cache_ref_for_first
1049                    .read(&blob_for_first, blob_id, &mut first_buf, 0)
1050                    .await
1051            });
1052            started_rx.await.expect("first erroring read never started");
1053
1054            // Join with a second waiter that should observe the same failure.
1055            let mut second_buf = vec![0u8; PAGE_SIZE.get() as usize];
1056            let cache_ref_for_second = cache_ref.clone();
1057            let blob_for_second = blob.clone();
1058            let second = context.clone().spawn(move |_| async move {
1059                cache_ref_for_second
1060                    .read(&blob_for_second, blob_id, &mut second_buf, 0)
1061                    .await
1062            });
1063
1064            // Wait until both tasks share the same in-flight fetch entry.
1065            loop {
1066                let joined = {
1067                    let page_cache = cache_ref.cache.read();
1068                    page_cache
1069                        .page_fetches
1070                        .get(&(blob_id, 0))
1071                        .map(|fetch| fetch.waiters == 2)
1072                        .unwrap_or(false)
1073                };
1074                if joined {
1075                    break;
1076                }
1077                context.sleep(Duration::from_millis(1)).await;
1078            }
1079
1080            // Release the blocked read so the shared fetch resolves with an error.
1081            let _ = release_tx.send(());
1082
1083            assert!(matches!(first.await, Ok(Err(Error::ReadFailed))));
1084            assert!(matches!(second.await, Ok(Err(Error::ReadFailed))));
1085            // Both waiters should still have shared a single blob read.
1086            assert_eq!(reads.load(Ordering::Relaxed), 1);
1087
1088            // The failed generation must remove its in-flight entry and avoid caching data.
1089            {
1090                let page_cache = cache_ref.cache.read();
1091                assert!(
1092                    !page_cache.page_fetches.contains_key(&(blob_id, 0)),
1093                    "erroring fetch should leave no stale page_fetches entry"
1094                );
1095            }
1096            let mut cached = vec![0u8; PAGE_SIZE.get() as usize];
1097            assert_eq!(cache_ref.read_cached(blob_id, &mut cached, 0), 0);
1098
1099            // A later read should start a fresh fetch rather than reusing stale error state.
1100            let mut third_buf = vec![0u8; PAGE_SIZE.get() as usize];
1101            assert!(matches!(
1102                cache_ref.read(&blob, blob_id, &mut third_buf, 0).await,
1103                Err(Error::ReadFailed)
1104            ));
1105            assert_eq!(reads.load(Ordering::Relaxed), 2);
1106        });
1107    }
1108}