Skip to main content

commonware_runtime/utils/buffer/paged/
cache.rs

1//! A page cache for caching _logical_ pages of [Blob] data in memory. The cache is unaware of the
2//! physical page format used by the blob, which is left to the blob implementation.
3
4use super::get_page_from_blob;
5use crate::{Blob, BufferPool, BufferPooler, Error, IoBuf, IoBufMut};
6use commonware_utils::sync::RwLock;
7use futures::{future::Shared, FutureExt};
8use std::{
9    collections::{hash_map::Entry, HashMap},
10    future::Future,
11    num::{NonZeroU16, NonZeroUsize},
12    pin::Pin,
13    sync::{
14        atomic::{AtomicBool, AtomicU64, Ordering},
15        Arc,
16    },
17};
18use tracing::{debug, error, trace};
19
20/// Shared future for one logical page fetch. The output uses `Arc<Error>` because `Shared`
21/// requires cloneable results. The `IoBuf` contains only the logical, validated page bytes.
22type PageFetchFuture = Shared<Pin<Box<dyn Future<Output = Result<IoBuf, Arc<Error>>> + Send>>>;
23
24/// Shared handle to one in-flight fetch generation. The cache keeps one copy in `page_fetches`,
25/// and each waiter clones the `Arc` while it is still interested in the result.
26type PageFetch = Arc<PageFetchFuture>;
27
28/// One in-flight fetch generation for a single `(blob_id, page_num)`.
29///
30/// `fetch` is shared by every waiter that joined this generation. `waiters` counts the still
31/// armed waiters whose drop path may need to remove this entry if they become the last
32/// unresolved waiter. If `page_fetches[key]` is later replaced by a newer generation, stale
33/// waiters from the old generation must ignore it and rely on `Arc::ptr_eq` against their saved
34/// `fetch`.
35struct PageFetchEntry {
36    /// Shared page fetch future that reads and validates the logical page exactly once.
37    fetch: PageFetch,
38    /// Count of waiters that still need cancellation cleanup for this fetch generation.
39    waiters: usize,
40}
41
42/// Removes a stale in-flight page fetch when the last unresolved waiter is dropped.
43struct PageFetchGuard {
44    cache: Arc<RwLock<Cache>>,
45    key: (u64, u64),
46    fetch: PageFetch,
47    armed: bool,
48}
49
50impl PageFetchGuard {
51    const fn new(cache: Arc<RwLock<Cache>>, key: (u64, u64), fetch: PageFetch) -> Self {
52        Self {
53            cache,
54            key,
55            fetch,
56            armed: true,
57        }
58    }
59
60    const fn disarm(&mut self) {
61        self.armed = false;
62    }
63}
64
65impl Drop for PageFetchGuard {
66    fn drop(&mut self) {
67        if !self.armed {
68            return;
69        }
70
71        // A resolved fetch removes `page_fetches[key]` before waiters resume and disarm their
72        // guards. If that fetch failed, the page remains uncached, so a new reader can install a
73        // new fetch for the same key before an old waiter is cancelled. Ignore drops from stale
74        // waiters so they cannot decrement or remove a newer generation. A surviving waiter keeps
75        // the current generation installed, which lets the shared future finish and cache the page
76        // on success.
77        let mut cache = self.cache.write();
78        let Entry::Occupied(mut current) = cache.page_fetches.entry(self.key) else {
79            return;
80        };
81        if !Arc::ptr_eq(&current.get().fetch, &self.fetch) {
82            return;
83        }
84        if current.get().waiters == 1 {
85            current.remove();
86        } else {
87            current.get_mut().waiters -= 1;
88        }
89    }
90}
91
92/// A [Cache] caches pages of [Blob] data in memory after verifying the integrity of each.
93///
94/// A single page cache can be used to cache data from multiple blobs by assigning a unique id to
95/// each.
96///
97/// Implements the [Clock](https://en.wikipedia.org/wiki/Page_replacement_algorithm#Clock)
98/// replacement policy, which is a lightweight approximation of LRU. The page `cache` is a circular
99/// list of recently accessed pages, and `clock` is the index of the next page within it to examine
100/// for replacement. When a page needs to be evicted, we start the search at `clock` within `cache`,
101/// searching for the first page with a false reference bit, and setting any skipped page's
102/// reference bit to false along the way.
103struct Cache {
104    /// The page cache index, with a key composed of (blob id, page number), that maps each cached
105    /// page to the index of its slot in `entries` and `slots`.
106    ///
107    /// # Invariants
108    ///
109    /// Each `index` entry maps to exactly one `entries` slot, and that entry always has a
110    /// matching key. (The converse is not true: after [Self::invalidate_from] a slot may retain
111    /// a stale key that is no longer present in `index`.)
112    index: HashMap<(u64, u64), usize>,
113
114    /// Metadata for each cache slot.
115    ///
116    /// Every entry reachable via `index` has a matching key here. Slots that were invalidated by
117    /// [Self::invalidate_from] retain their stale key but are unreachable from `index` and will
118    /// be reclaimed by the Clock evictor on the next sweep.
119    entries: Vec<CacheEntry>,
120
121    /// Per-slot page buffers allocated from the pool.
122    ///
123    /// `slots[i]` stores one logical page for `entries[i]`.
124    slots: Vec<IoBufMut>,
125
126    /// Size of each page in bytes.
127    page_size: usize,
128
129    /// The Clock replacement policy's clock hand index into `entries`.
130    clock: usize,
131
132    /// The maximum number of pages that will be cached.
133    capacity: usize,
134
135    /// A map of currently executing page fetches to ensure only one task at a time is trying to
136    /// fetch a specific page.
137    page_fetches: HashMap<(u64, u64), PageFetchEntry>,
138}
139
140/// Metadata for a single cache entry (page data stored in per-slot buffers).
141struct CacheEntry {
142    /// The cache key which is composed of the blob id and page number of the page.
143    ///
144    /// # Invariant
145    ///
146    /// Every live cache slot has a matching entry in `index`. Slots that have been invalidated (see
147    /// [Cache::invalidate_from]) retain their stale key here but are no longer reachable via
148    /// `index` and will be reclaimed first by the Clock evictor.
149    key: (u64, u64),
150
151    /// A bit indicating whether this page was recently referenced.
152    referenced: AtomicBool,
153}
154
155/// A reference to a page cache that can be shared across threads via cloning, along with the page
156/// size that will be used with it. Provides the API for interacting with the page cache in a
157/// thread-safe manner.
158#[derive(Clone)]
159pub struct CacheRef {
160    /// The size of each page in the underlying blobs managed by this page cache.
161    ///
162    /// # Warning
163    ///
164    /// You cannot change the page size once data has been written without invalidating it. (Reads
165    /// on blobs that were written with a different page size will fail their integrity check.)
166    page_size: u64,
167
168    /// The next id to assign to a blob that will be managed by this cache.
169    next_id: Arc<AtomicU64>,
170
171    /// Shareable reference to the page cache.
172    cache: Arc<RwLock<Cache>>,
173
174    /// Pool used for page-cache and associated buffer allocations.
175    pool: BufferPool,
176}
177
178impl CacheRef {
179    /// Create a shared page-cache handle backed by `pool`.
180    ///
181    /// The cache stores at most `capacity` pages, each exactly `page_size` bytes.
182    /// Initialization eagerly allocates and zeroes all cache slots from `pool`.
183    pub fn new(pool: BufferPool, page_size: NonZeroU16, capacity: NonZeroUsize) -> Self {
184        let page_size_u64 = page_size.get() as u64;
185
186        Self {
187            page_size: page_size_u64,
188            next_id: Arc::new(AtomicU64::new(0)),
189            cache: Arc::new(RwLock::new(Cache::new(pool.clone(), page_size, capacity))),
190            pool,
191        }
192    }
193
194    /// Create a shared page-cache handle, extracting the storage [BufferPool] from a
195    /// [BufferPooler].
196    pub fn from_pooler(
197        pooler: &impl BufferPooler,
198        page_size: NonZeroU16,
199        capacity: NonZeroUsize,
200    ) -> Self {
201        Self::new(pooler.storage_buffer_pool().clone(), page_size, capacity)
202    }
203
204    /// The page size used by this page cache.
205    #[inline]
206    pub const fn page_size(&self) -> u64 {
207        self.page_size
208    }
209
210    /// Returns the storage buffer pool associated with this cache.
211    #[inline]
212    pub const fn pool(&self) -> &BufferPool {
213        &self.pool
214    }
215
216    /// Returns a unique id for the next blob that will use this page cache.
217    pub fn next_id(&self) -> u64 {
218        self.next_id.fetch_add(1, Ordering::Relaxed)
219    }
220
221    /// Convert a logical offset into the number of the page it belongs to and the offset within
222    /// that page.
223    pub const fn offset_to_page(&self, offset: u64) -> (u64, u64) {
224        Cache::offset_to_page(self.page_size, offset)
225    }
226
227    /// Try to read the specified bytes from the page cache only. Returns the number of bytes
228    /// successfully read from cache and copied to `buf` before a page fault, if any.
229    pub(super) fn read_cached(
230        &self,
231        blob_id: u64,
232        mut buf: &mut [u8],
233        mut logical_offset: u64,
234    ) -> usize {
235        let original_len = buf.len();
236        let page_cache = self.cache.read();
237        while !buf.is_empty() {
238            let count = page_cache.read_at(blob_id, buf, logical_offset);
239            if count == 0 {
240                // Cache miss - return how many bytes we successfully read
241                break;
242            }
243            logical_offset += count as u64;
244            buf = &mut buf[count..];
245        }
246        original_len - buf.len()
247    }
248
249    /// Read multiple disjoint byte ranges from the page cache in a single lock acquisition.
250    ///
251    /// Each element of `ranges` is `(dest_slice, logical_offset)`. Fully-cached ranges have
252    /// their data written to the destination slice and are removed from `ranges`. Entries left
253    /// in `ranges` correspond to cache misses that the caller must read from the underlying
254    /// blob.
255    pub(super) fn read_cached_many(&self, blob_id: u64, ranges: &mut Vec<(&mut [u8], u64)>) {
256        let page_cache = self.cache.read();
257        ranges.retain_mut(|(buf, logical_offset)| {
258            let mut remaining = buf.len();
259            let mut offset = *logical_offset;
260            let mut dst = 0;
261            while remaining > 0 {
262                let count = page_cache.read_at(blob_id, &mut buf[dst..], offset);
263                if count == 0 {
264                    break;
265                }
266                offset += count as u64;
267                dst += count;
268                remaining -= count;
269            }
270
271            // Keep cache misses in `ranges`; drop fully-cached entries.
272            remaining > 0
273        });
274    }
275
276    /// Read the specified bytes, preferentially from the page cache. Bytes not found in the cache
277    /// will be read from the provided `blob` and cached for future reads.
278    pub(super) async fn read<B: Blob>(
279        &self,
280        blob: &B,
281        blob_id: u64,
282        mut buf: &mut [u8],
283        mut offset: u64,
284    ) -> Result<(), Error> {
285        // Read up to a page worth of data at a time from either the page cache or the `blob`,
286        // until the requested data is fully read.
287        while !buf.is_empty() {
288            // Read lock the page cache and see if we can get (some of) the data from it.
289            {
290                let page_cache = self.cache.read();
291                let count = page_cache.read_at(blob_id, buf, offset);
292                if count != 0 {
293                    offset += count as u64;
294                    buf = &mut buf[count..];
295                    continue;
296                }
297            }
298
299            // Handle page fault.
300            let count = self
301                .read_after_page_fault(blob, blob_id, buf, offset)
302                .await?;
303            offset += count as u64;
304            buf = &mut buf[count..];
305        }
306
307        Ok(())
308    }
309
310    /// Fetch the requested page after encountering a page fault, which may involve retrieving it
311    /// from `blob` & caching the result in the page cache. Returns the number of bytes read, which
312    /// should always be non-zero.
313    pub(super) async fn read_after_page_fault<B: Blob>(
314        &self,
315        blob: &B,
316        blob_id: u64,
317        buf: &mut [u8],
318        offset: u64,
319    ) -> Result<usize, Error> {
320        assert!(!buf.is_empty());
321
322        let (page_num, offset_in_page) = Cache::offset_to_page(self.page_size, offset);
323        let offset_in_page = offset_in_page as usize;
324        trace!(page_num, blob_id, "page fault");
325
326        // Create or clone a future that retrieves the desired page from the underlying blob. This
327        // requires a write lock on the page cache since we may need to modify `page_fetches` if
328        // this task is the first fetcher.
329        let (fetch_future, mut fetch_guard) = {
330            let mut cache = self.cache.write();
331
332            // There's a (small) chance the page was fetched & buffered by another task before we
333            // were able to acquire the write lock, so check the cache before doing anything else.
334            let count = cache.read_at(blob_id, buf, offset);
335            if count != 0 {
336                return Ok(count);
337            }
338
339            let key = (blob_id, page_num);
340            match cache.page_fetches.entry(key) {
341                Entry::Occupied(o) => {
342                    // Another thread is already fetching this page, so clone its existing future.
343                    let entry = o.into_mut();
344                    entry.waiters += 1;
345                    let fetch_future = entry.fetch.as_ref().clone();
346                    let fetch = Arc::clone(&entry.fetch);
347                    (
348                        fetch_future,
349                        PageFetchGuard::new(Arc::clone(&self.cache), key, fetch),
350                    )
351                }
352                Entry::Vacant(v) => {
353                    // Nobody is currently fetching this page, so create a future that will do the
354                    // work. get_page_from_blob handles CRC validation and returns only logical bytes.
355                    let blob = blob.clone();
356                    let cache = Arc::clone(&self.cache);
357                    let page_size = self.page_size;
358                    let future = async move {
359                        let result = fetch_cacheable_page(&blob, page_num, page_size).await;
360                        if let Err(err) = &result {
361                            error!(page_num, ?err, "Page fetch failed");
362                        }
363
364                        // This shared future still owns `page_fetches[key]`. As long as at least
365                        // one waiter remains armed, that entry pins this generation in place, so a
366                        // replacement fetch for the same page cannot be inserted before we cache
367                        // the successful result below. Only when every waiter cancels can the last
368                        // guard remove the entry and let a later reader start a new generation.
369                        let mut cache = cache.write();
370                        if let Ok(page) = &result {
371                            cache.cache(blob_id, page.as_ref(), page_num);
372                        }
373                        let _ = cache.page_fetches.remove(&key);
374                        result
375                    };
376
377                    // Make the future shareable and insert it into the map.
378                    let fetch_future = future.boxed().shared();
379                    let fetch = Arc::new(fetch_future.clone());
380                    v.insert(PageFetchEntry {
381                        fetch: Arc::clone(&fetch),
382                        waiters: 1,
383                    });
384
385                    (
386                        fetch_future,
387                        PageFetchGuard::new(Arc::clone(&self.cache), key, fetch),
388                    )
389                }
390            }
391        };
392
393        // Await the shared fetch. The future itself logs failures, caches the resolved page, and
394        // removes the in-flight marker before it returns, so waiters only need cancellation
395        // cleanup while the fetch is still unresolved.
396        let fetch_result = fetch_future.await;
397        fetch_guard.disarm();
398        let page_buf = match fetch_result {
399            Ok(page_buf) => page_buf,
400            Err(_) => return Err(Error::ReadFailed),
401        };
402
403        // Copy the requested portion of the page into the buffer.
404        let bytes_to_copy = std::cmp::min(buf.len(), page_buf.len() - offset_in_page);
405        buf[..bytes_to_copy]
406            .copy_from_slice(&page_buf.as_ref()[offset_in_page..offset_in_page + bytes_to_copy]);
407
408        Ok(bytes_to_copy)
409    }
410
411    /// Cache the provided pages of data in the page cache, returning the remaining bytes that
412    /// didn't fill a whole page. `offset` must be page aligned.
413    ///
414    /// # Panics
415    ///
416    /// - Panics if `offset` is not page aligned.
417    /// - If the buffer is not the size of a page.
418    pub fn cache(&self, blob_id: u64, mut buf: &[u8], offset: u64) -> usize {
419        let (mut page_num, offset_in_page) = self.offset_to_page(offset);
420        assert_eq!(offset_in_page, 0);
421        {
422            // Write lock the page cache.
423            let page_size = self.page_size as usize;
424            let mut page_cache = self.cache.write();
425            while buf.len() >= page_size {
426                page_cache.cache(blob_id, &buf[..page_size], page_num);
427                buf = &buf[page_size..];
428                page_num = match page_num.checked_add(1) {
429                    Some(next) => next,
430                    None => break,
431                };
432            }
433        }
434
435        buf.len()
436    }
437
438    /// Drop any cached pages for `blob_id` at `page_num >= start_page`. Used after a blob is
439    /// truncated so subsequent reads can't observe pre-truncation bytes in a page that the tip
440    /// buffer (or future writes) now owns.
441    pub(super) fn invalidate_from(&self, blob_id: u64, start_page: u64) {
442        self.cache.write().invalidate_from(blob_id, start_page);
443    }
444}
445
446impl Cache {
447    /// Return a new empty page cache with an initial next-blob id of 0, and a max cache capacity
448    /// of `capacity` pages, each of size `page_size` bytes.
449    pub fn new(pool: BufferPool, page_size: NonZeroU16, capacity: NonZeroUsize) -> Self {
450        let page_size = page_size.get() as usize;
451        let capacity = capacity.get();
452        let mut slots = Vec::with_capacity(capacity);
453        for _ in 0..capacity {
454            let slot = pool.alloc_zeroed(page_size);
455            slots.push(slot);
456        }
457        Self {
458            index: HashMap::new(),
459            entries: Vec::with_capacity(capacity),
460            slots,
461            page_size,
462            clock: 0,
463            capacity,
464            page_fetches: HashMap::new(),
465        }
466    }
467
468    /// Returns a slice to the page data for the given slot index.
469    #[inline]
470    fn page_slice(&self, slot: usize) -> &[u8] {
471        assert!(slot < self.capacity);
472        self.slots[slot].as_ref()
473    }
474
475    /// Returns a mutable slice to the page data for the given slot index.
476    #[inline]
477    fn page_slice_mut(&mut self, slot: usize) -> &mut [u8] {
478        assert!(slot < self.capacity);
479        self.slots[slot].as_mut()
480    }
481
482    /// Convert an offset into the number of the page it belongs to and the offset within that page.
483    const fn offset_to_page(page_size: u64, offset: u64) -> (u64, u64) {
484        (offset / page_size, offset % page_size)
485    }
486
487    /// Attempt to fetch blob data starting at `offset` from the page cache. Returns the number of
488    /// bytes read, which could be 0 if the first page in the requested range isn't buffered, and is
489    /// never more than `self.page_size` or the length of `buf`. The returned bytes won't cross a
490    /// page boundary, so multiple reads may be required even if all data in the desired range is
491    /// buffered.
492    fn read_at(&self, blob_id: u64, buf: &mut [u8], logical_offset: u64) -> usize {
493        let (page_num, offset_in_page) =
494            Self::offset_to_page(self.page_size as u64, logical_offset);
495        let Some(&slot) = self.index.get(&(blob_id, page_num)) else {
496            return 0;
497        };
498        let entry = &self.entries[slot];
499        assert_eq!(entry.key, (blob_id, page_num));
500        entry.referenced.store(true, Ordering::Relaxed);
501
502        let page = self.page_slice(slot);
503        let bytes_to_copy = std::cmp::min(buf.len(), self.page_size - offset_in_page as usize);
504        buf[..bytes_to_copy].copy_from_slice(
505            &page[offset_in_page as usize..offset_in_page as usize + bytes_to_copy],
506        );
507
508        bytes_to_copy
509    }
510
511    /// Put the given `page` into the page cache.
512    fn cache(&mut self, blob_id: u64, page: &[u8], page_num: u64) {
513        assert_eq!(page.len(), self.page_size);
514        let key = (blob_id, page_num);
515
516        // Check for existing entry (update case)
517        if let Some(&slot) = self.index.get(&key) {
518            // This case can result when a blob is truncated across a page boundary, and later grows
519            // back to (beyond) its original size. It will also become expected behavior once we
520            // allow cached pages to be writable.
521            debug!(blob_id, page_num, "updating duplicate page");
522
523            // Update the stale data with the new page.
524            let entry = &self.entries[slot];
525            assert_eq!(entry.key, key);
526            entry.referenced.store(true, Ordering::Relaxed);
527            self.page_slice_mut(slot).copy_from_slice(page);
528            return;
529        }
530
531        // New entry - check if we need to evict
532        if self.entries.len() < self.capacity {
533            // Still growing: use next available slot
534            let slot = self.entries.len();
535            self.index.insert(key, slot);
536            self.entries.push(CacheEntry {
537                key,
538                referenced: AtomicBool::new(true),
539            });
540            self.page_slice_mut(slot).copy_from_slice(page);
541            return;
542        }
543
544        // Cache full: find slot to evict using Clock algorithm. Invalidated slots (`referenced =
545        // false`, stale `entry.key` no longer in `index`) are reclaimed on the first sweep.
546        while self.entries[self.clock].referenced.load(Ordering::Relaxed) {
547            self.entries[self.clock]
548                .referenced
549                .store(false, Ordering::Relaxed);
550            self.clock = (self.clock + 1) % self.entries.len();
551        }
552
553        // Evict and replace. Only drop the old `entry.key` from `index` when it still points
554        // to this slot: after `invalidate_from` a slot may hold a stale key that has since
555        // been re-cached at a different slot, and an unconditional `remove` would orphan
556        // that live entry.
557        let slot = self.clock;
558        let entry = &mut self.entries[slot];
559        if self.index.get(&entry.key) == Some(&slot) {
560            self.index.remove(&entry.key);
561        }
562        self.index.insert(key, slot);
563        entry.key = key;
564        entry.referenced.store(true, Ordering::Relaxed);
565        self.page_slice_mut(slot).copy_from_slice(page);
566
567        // Move the clock forward.
568        self.clock = (self.clock + 1) % self.entries.len();
569    }
570
571    /// Drop any cached pages for `blob_id` at `page_num >= start_page`. The slots keep their
572    /// (now stale) `entry.key` so the Clock evictor can reclaim them; `read_at` and the
573    /// duplicate-update path never reach them because `index` no longer maps to them.
574    fn invalidate_from(&mut self, blob_id: u64, start_page: u64) {
575        self.index.retain(|&(bid, page_num), &mut slot| {
576            if bid != blob_id || page_num < start_page {
577                return true;
578            }
579            self.entries[slot]
580                .referenced
581                .store(false, Ordering::Relaxed);
582            false
583        });
584    }
585}
586
587/// Fetch one logical page for insertion into the page cache, rejecting partial pages because cache
588/// entries must always contain a full logical page.
589async fn fetch_cacheable_page(
590    blob: &impl Blob,
591    page_num: u64,
592    page_size: u64,
593) -> Result<IoBuf, Arc<Error>> {
594    let page = get_page_from_blob(blob, page_num, page_size)
595        .await
596        .map_err(Arc::new)?;
597
598    // We should never be fetching partial pages through the page cache. This can happen if a
599    // non-last page is corrupted and falls back to a partial CRC.
600    let len = page.len();
601    if len != page_size as usize {
602        error!(
603            page_num,
604            expected = page_size,
605            actual = len,
606            "attempted to fetch partial page from blob"
607        );
608        return Err(Arc::new(Error::InvalidChecksum));
609    }
610
611    Ok(page)
612}
613
614#[cfg(test)]
615mod tests {
616    use super::{super::Checksum, *};
617    use crate::{
618        buffer::paged::CHECKSUM_SIZE, deterministic, telemetry::metrics::Registry, Buf, BufferPool,
619        BufferPoolConfig, Clock as _, IoBufsMut, Runner as _, Spawner as _, Storage as _,
620        Supervisor as _,
621    };
622    use commonware_cryptography::Crc32;
623    use commonware_macros::test_traced;
624    use commonware_utils::{channel::oneshot, sync::Mutex, NZUsize, NZU16};
625    use futures::future::pending;
626    use std::{
627        num::NonZeroU16,
628        sync::{
629            atomic::{AtomicUsize, Ordering},
630            Arc,
631        },
632        time::Duration,
633    };
634
635    fn test_pool() -> BufferPool {
636        let mut registry = Registry::default();
637        BufferPool::new(BufferPoolConfig::for_storage(), &mut registry)
638    }
639
640    // Logical page size (what CacheRef uses and what gets cached).
641    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
642    const PAGE_SIZE_U64: u64 = PAGE_SIZE.get() as u64;
643
644    /// A blob that signals once a read starts and then never returns.
645    #[derive(Clone)]
646    struct BlockingBlob {
647        started: Arc<Mutex<Option<oneshot::Sender<()>>>>,
648    }
649
650    impl Blob for BlockingBlob {
651        async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
652            self.read_at_buf(offset, len, IoBufsMut::default()).await
653        }
654
655        async fn read_at_buf(
656            &self,
657            _offset: u64,
658            _len: usize,
659            _bufs: impl Into<IoBufsMut> + Send,
660        ) -> Result<IoBufsMut, Error> {
661            let sender = self
662                .started
663                .lock()
664                .take()
665                .expect("blocking blob read started more than once");
666            let _ = sender.send(());
667            pending::<()>().await;
668            unreachable!()
669        }
670
671        async fn write_at(
672            &self,
673            _offset: u64,
674            _bufs: impl Into<crate::IoBufs> + Send,
675        ) -> Result<(), Error> {
676            Ok(())
677        }
678
679        async fn write_at_sync(
680            &self,
681            offset: u64,
682            bufs: impl Into<crate::IoBufs> + Send,
683        ) -> Result<(), Error> {
684            let bufs = bufs.into();
685            if !bufs.has_remaining() {
686                return Ok(());
687            }
688
689            self.write_at(offset, bufs).await?;
690            self.sync().await
691        }
692
693        async fn resize(&self, _len: u64) -> Result<(), Error> {
694            Ok(())
695        }
696
697        async fn sync(&self) -> Result<(), Error> {
698            Ok(())
699        }
700    }
701
702    #[derive(Clone)]
703    enum ControlledBlobResult {
704        Success(Arc<Vec<u8>>),
705        Error,
706    }
707
708    /// A blob that blocks its first physical page read until released and counts total reads.
709    #[derive(Clone)]
710    struct ControlledBlob {
711        started: Arc<Mutex<Option<oneshot::Sender<()>>>>,
712        release: Arc<Mutex<Option<oneshot::Receiver<()>>>>,
713        reads: Arc<AtomicUsize>,
714        result: ControlledBlobResult,
715    }
716
717    impl Blob for ControlledBlob {
718        async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
719            self.read_at_buf(offset, len, IoBufsMut::default()).await
720        }
721
722        async fn read_at_buf(
723            &self,
724            _offset: u64,
725            _len: usize,
726            _bufs: impl Into<IoBufsMut> + Send,
727        ) -> Result<IoBufsMut, Error> {
728            if self.reads.fetch_add(1, Ordering::Relaxed) == 0 {
729                let sender = self
730                    .started
731                    .lock()
732                    .take()
733                    .expect("controlled blob start signal consumed more than once");
734                let _ = sender.send(());
735
736                let release = self
737                    .release
738                    .lock()
739                    .take()
740                    .expect("controlled blob release receiver consumed more than once");
741                release.await.expect("release signal dropped");
742            }
743
744            match &self.result {
745                ControlledBlobResult::Success(page) => Ok(IoBufsMut::from(page.as_ref().clone())),
746                ControlledBlobResult::Error => Err(Error::ReadFailed),
747            }
748        }
749
750        async fn write_at(
751            &self,
752            _offset: u64,
753            _bufs: impl Into<crate::IoBufs> + Send,
754        ) -> Result<(), Error> {
755            Ok(())
756        }
757
758        async fn write_at_sync(
759            &self,
760            offset: u64,
761            bufs: impl Into<crate::IoBufs> + Send,
762        ) -> Result<(), Error> {
763            let bufs = bufs.into();
764            if !bufs.has_remaining() {
765                return Ok(());
766            }
767
768            self.write_at(offset, bufs).await?;
769            self.sync().await
770        }
771
772        async fn resize(&self, _len: u64) -> Result<(), Error> {
773            Ok(())
774        }
775
776        async fn sync(&self) -> Result<(), Error> {
777            Ok(())
778        }
779    }
780
781    #[test_traced]
782    fn test_cache_basic() {
783        let pool = test_pool();
784        let mut cache: Cache = Cache::new(pool, PAGE_SIZE, NZUsize!(10));
785
786        // Cache stores logical-sized pages.
787        let mut buf = vec![0; PAGE_SIZE.get() as usize];
788        let bytes_read = cache.read_at(0, &mut buf, 0);
789        assert_eq!(bytes_read, 0);
790
791        cache.cache(0, &[1; PAGE_SIZE.get() as usize], 0);
792        let bytes_read = cache.read_at(0, &mut buf, 0);
793        assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
794        assert_eq!(buf, [1; PAGE_SIZE.get() as usize]);
795
796        // Test replacement -- should log a duplicate page warning but still work.
797        cache.cache(0, &[2; PAGE_SIZE.get() as usize], 0);
798        let bytes_read = cache.read_at(0, &mut buf, 0);
799        assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
800        assert_eq!(buf, [2; PAGE_SIZE.get() as usize]);
801
802        // Test exceeding the cache capacity.
803        for i in 0u64..11 {
804            cache.cache(0, &[i as u8; PAGE_SIZE.get() as usize], i);
805        }
806        // Page 0 should have been evicted.
807        let bytes_read = cache.read_at(0, &mut buf, 0);
808        assert_eq!(bytes_read, 0);
809        // Page 1-10 should be in the cache.
810        for i in 1u64..11 {
811            let bytes_read = cache.read_at(0, &mut buf, i * PAGE_SIZE_U64);
812            assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
813            assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
814        }
815
816        // Test reading from an unaligned offset by adding 2 to an aligned offset. The read
817        // should be 2 bytes short of a full logical page.
818        let mut buf = vec![0; PAGE_SIZE.get() as usize];
819        let bytes_read = cache.read_at(0, &mut buf, PAGE_SIZE_U64 + 2);
820        assert_eq!(bytes_read, PAGE_SIZE.get() as usize - 2);
821        assert_eq!(
822            &buf[..PAGE_SIZE.get() as usize - 2],
823            [1; PAGE_SIZE.get() as usize - 2]
824        );
825    }
826
827    #[test_traced]
828    fn test_invalidate_from_does_not_orphan_re_cached_page() {
829        // Regression: when the Clock evictor lands on an invalidated slot whose stale key has
830        // since been re-cached at a different slot, the old index entry (pointing to the
831        // live slot) must not be removed.
832        let mut registry = Registry::default();
833        let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
834        let mut cache: Cache = Cache::new(pool, PAGE_SIZE, NZUsize!(2));
835        let blob_id = 0u64;
836        let page_size = PAGE_SIZE.get() as usize;
837
838        // Fill both slots, then invalidate them so both carry stale keys with referenced=false.
839        cache.cache(blob_id, &vec![0xAA; page_size], 0);
840        cache.cache(blob_id, &vec![0xBB; page_size], 1);
841        cache.invalidate_from(blob_id, 0);
842
843        // Re-cache page 1. Clock sits at slot 0, which is referenced=false, so the insert
844        // lands at slot 0 (slot 1 still holds its stale (blob, 1) key).
845        cache.cache(blob_id, &vec![0xCC; page_size], 1);
846        let mut buf = vec![0u8; page_size];
847        assert_eq!(
848            cache.read_at(blob_id, &mut buf, PAGE_SIZE_U64),
849            page_size,
850            "page 1 should be readable after re-cache"
851        );
852        assert_eq!(buf, vec![0xCC; page_size]);
853
854        // Cache a new page. Clock now advances to slot 1 (still referenced=false), evicts it.
855        // With the buggy unconditional `index.remove(entry.key)` this would remove the live
856        // (blob, 1) -> slot 0 mapping, orphaning slot 0.
857        cache.cache(blob_id, &vec![0xDD; page_size], 2);
858
859        // Slot 0 must still be reachable via its live index entry.
860        let mut buf = vec![0u8; page_size];
861        assert_eq!(
862            cache.read_at(blob_id, &mut buf, PAGE_SIZE_U64),
863            page_size,
864            "live page 1 was orphaned by stale-slot eviction"
865        );
866        assert_eq!(buf, vec![0xCC; page_size]);
867
868        // And the newly cached page 2 is also reachable.
869        let mut buf = vec![0u8; page_size];
870        assert_eq!(
871            cache.read_at(blob_id, &mut buf, PAGE_SIZE_U64 * 2),
872            page_size
873        );
874        assert_eq!(buf, vec![0xDD; page_size]);
875    }
876
877    #[test_traced]
878    fn test_cache_read_with_blob() {
879        // Initialize the deterministic context
880        let executor = deterministic::Runner::default();
881        // Start the test within the executor
882        executor.start(|context| async move {
883            // Physical page size = logical + CRC record.
884            let physical_page_size = PAGE_SIZE_U64 + CHECKSUM_SIZE;
885
886            // Populate a blob with 11 consecutive pages of CRC-protected data.
887            let (blob, size) = context
888                .open("test", "blob".as_bytes())
889                .await
890                .expect("Failed to open blob");
891            assert_eq!(size, 0);
892            for i in 0..11 {
893                // Write logical data followed by Checksum.
894                let logical_data = vec![i as u8; PAGE_SIZE.get() as usize];
895                let crc = Crc32::checksum(&logical_data);
896                let record = Checksum::new(PAGE_SIZE.get(), crc);
897                let mut page_data = logical_data;
898                page_data.extend_from_slice(&record.to_bytes());
899                blob.write_at(i * physical_page_size, page_data)
900                    .await
901                    .unwrap();
902            }
903
904            // Fill the page cache with the blob's data via CacheRef::read.
905            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
906            assert_eq!(cache_ref.next_id(), 0);
907            assert_eq!(cache_ref.next_id(), 1);
908            for i in 0..11 {
909                // Read expects logical bytes only (CRCs are stripped).
910                let mut buf = vec![0; PAGE_SIZE.get() as usize];
911                cache_ref
912                    .read(&blob, 0, &mut buf, i * PAGE_SIZE_U64)
913                    .await
914                    .unwrap();
915                assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
916            }
917
918            // Repeat the read to exercise reading from the page cache. Must start at 1 because
919            // page 0 should be evicted.
920            for i in 1..11 {
921                let mut buf = vec![0; PAGE_SIZE.get() as usize];
922                cache_ref
923                    .read(&blob, 0, &mut buf, i * PAGE_SIZE_U64)
924                    .await
925                    .unwrap();
926                assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
927            }
928
929            // Cleanup.
930            blob.sync().await.unwrap();
931        });
932    }
933
934    #[test_traced]
935    fn test_cache_max_page() {
936        let executor = deterministic::Runner::default();
937        executor.start(|context| async move {
938            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(2));
939
940            // Use the largest page-aligned offset representable for the configured PAGE_SIZE.
941            let aligned_max_offset = u64::MAX - (u64::MAX % PAGE_SIZE_U64);
942
943            // CacheRef::cache expects only logical bytes (no CRC).
944            let logical_data = vec![42u8; PAGE_SIZE.get() as usize];
945
946            // Caching exactly one page at the maximum offset should succeed.
947            let remaining = cache_ref.cache(0, logical_data.as_slice(), aligned_max_offset);
948            assert_eq!(remaining, 0);
949
950            // Reading from the cache should return the logical bytes.
951            let mut buf = vec![0u8; PAGE_SIZE.get() as usize];
952            let page_cache = cache_ref.cache.read();
953            let bytes_read = page_cache.read_at(0, &mut buf, aligned_max_offset);
954            assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
955            assert!(buf.iter().all(|b| *b == 42));
956        });
957    }
958
959    #[test_traced]
960    fn test_cache_at_high_offset() {
961        let executor = deterministic::Runner::default();
962        executor.start(|context| async move {
963            // Use the minimum page size (CHECKSUM_SIZE + 1 = 13) with high offset.
964            const MIN_PAGE_SIZE: u64 = CHECKSUM_SIZE + 1;
965            let cache_ref =
966                CacheRef::from_pooler(&context, NZU16!(MIN_PAGE_SIZE as u16), NZUsize!(2));
967
968            // Create two pages worth of logical data (no CRCs - CacheRef::cache expects logical
969            // only).
970            let data = vec![1u8; MIN_PAGE_SIZE as usize * 2];
971
972            // Cache pages at a high (but not max) aligned offset so we can verify both pages.
973            // Use an offset that's a few pages below max to avoid overflow when verifying.
974            let aligned_max_offset = u64::MAX - (u64::MAX % MIN_PAGE_SIZE);
975            let high_offset = aligned_max_offset - (MIN_PAGE_SIZE * 2);
976            let remaining = cache_ref.cache(0, &data, high_offset);
977            // Both pages should be cached.
978            assert_eq!(remaining, 0);
979
980            // Verify the first page was cached correctly.
981            let mut buf = vec![0u8; MIN_PAGE_SIZE as usize];
982            let page_cache = cache_ref.cache.read();
983            assert_eq!(
984                page_cache.read_at(0, &mut buf, high_offset),
985                MIN_PAGE_SIZE as usize
986            );
987            assert!(buf.iter().all(|b| *b == 1));
988
989            // Verify the second page was cached correctly.
990            assert_eq!(
991                page_cache.read_at(0, &mut buf, high_offset + MIN_PAGE_SIZE),
992                MIN_PAGE_SIZE as usize
993            );
994            assert!(buf.iter().all(|b| *b == 1));
995        });
996    }
997
998    #[test_traced]
999    fn test_page_fetches_entry_removed_when_first_fetcher_cancelled() {
1000        let executor = deterministic::Runner::default();
1001        executor.start(|context| async move {
1002            // Set up a small cache and a blob whose read never completes once started.
1003            let blob_id = 0;
1004            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
1005            let (started_tx, started_rx) = oneshot::channel();
1006            let blob = BlockingBlob {
1007                started: Arc::new(Mutex::new(Some(started_tx))),
1008            };
1009            let mut read_buf = vec![0u8; PAGE_SIZE.get() as usize];
1010
1011            // Spawn the first fetcher. It will insert into `page_fetches` and then block forever.
1012            let cache_ref_for_task = cache_ref.clone();
1013            let blob_for_task = blob.clone();
1014            let handle = context.spawn(move |_| async move {
1015                let _ = cache_ref_for_task
1016                    .read(&blob_for_task, blob_id, &mut read_buf, 0)
1017                    .await;
1018            });
1019
1020            // Wait until the underlying read has started, ensuring the in-flight marker exists.
1021            started_rx.await.expect("blocking read never started");
1022            {
1023                let page_cache = cache_ref.cache.read();
1024                assert!(page_cache.page_fetches.contains_key(&(blob_id, 0)));
1025            }
1026
1027            // Cancel the first fetcher before it reaches explicit cleanup.
1028            handle.abort();
1029            assert!(matches!(handle.await, Err(Error::Closed)));
1030
1031            // The guard drop path should have removed the stale in-flight entry.
1032            let page_cache = cache_ref.cache.read();
1033            assert!(
1034                !page_cache.page_fetches.contains_key(&(blob_id, 0)),
1035                "cancelled first fetcher should not leave stale page_fetches entry"
1036            );
1037        });
1038    }
1039
1040    #[test_traced]
1041    fn test_followers_keep_single_flight_after_first_fetcher_cancellation() {
1042        let executor = deterministic::Runner::default();
1043        executor.start(|context| async move {
1044            let blob_id = 0;
1045            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
1046
1047            // Return one valid full page, but hold the underlying read until the test releases it.
1048            let logical_page = vec![7u8; PAGE_SIZE.get() as usize];
1049            let crc = Crc32::checksum(&logical_page);
1050            let mut physical_page = logical_page.clone();
1051            physical_page.extend_from_slice(&Checksum::new(PAGE_SIZE.get(), crc).to_bytes());
1052            let (started_tx, started_rx) = oneshot::channel();
1053            let (release_tx, release_rx) = oneshot::channel();
1054            let reads = Arc::new(AtomicUsize::new(0));
1055            let blob = ControlledBlob {
1056                started: Arc::new(Mutex::new(Some(started_tx))),
1057                release: Arc::new(Mutex::new(Some(release_rx))),
1058                reads: reads.clone(),
1059                result: ControlledBlobResult::Success(Arc::new(physical_page)),
1060            };
1061
1062            // Start the fetch that installs the shared in-flight entry.
1063            let mut first_buf = vec![0u8; PAGE_SIZE.get() as usize];
1064            let cache_ref_for_first = cache_ref.clone();
1065            let blob_for_first = blob.clone();
1066            let first = context.child("first").spawn(move |_| async move {
1067                let _ = cache_ref_for_first
1068                    .read(&blob_for_first, blob_id, &mut first_buf, 0)
1069                    .await;
1070            });
1071            started_rx.await.expect("first read never started");
1072
1073            // Join as a follower while the first fetch is still blocked in the blob.
1074            let mut second_buf = vec![0u8; PAGE_SIZE.get() as usize];
1075            let cache_ref_for_second = cache_ref.clone();
1076            let blob_for_second = blob.clone();
1077            let second = context.child("second").spawn(move |_| async move {
1078                cache_ref_for_second
1079                    .read(&blob_for_second, blob_id, &mut second_buf, 0)
1080                    .await
1081                    .expect("second read failed");
1082                second_buf
1083            });
1084
1085            // Wait until both tasks are registered against the same in-flight fetch.
1086            loop {
1087                let joined = {
1088                    let page_cache = cache_ref.cache.read();
1089                    page_cache
1090                        .page_fetches
1091                        .get(&(blob_id, 0))
1092                        .map(|fetch| fetch.waiters == 2)
1093                        .unwrap_or(false)
1094                };
1095                if joined {
1096                    break;
1097                }
1098                context.sleep(Duration::from_millis(1)).await;
1099            }
1100
1101            // Cancel the original fetcher; the follower should keep the generation alive.
1102            first.abort();
1103            assert!(matches!(first.await, Err(Error::Closed)));
1104
1105            // A later reader should still join the existing in-flight fetch instead of starting a
1106            // second blob read.
1107            let mut third_buf = vec![0u8; PAGE_SIZE.get() as usize];
1108            let cache_ref_for_third = cache_ref.clone();
1109            let blob_for_third = blob.clone();
1110            let third = context.child("third").spawn(move |_| async move {
1111                cache_ref_for_third
1112                    .read(&blob_for_third, blob_id, &mut third_buf, 0)
1113                    .await
1114                    .expect("third read failed");
1115                third_buf
1116            });
1117
1118            // Either the third reader bumps the waiter count back to 2, or a bug starts a second
1119            // blob read.
1120            loop {
1121                let third_entered = {
1122                    let page_cache = cache_ref.cache.read();
1123                    reads.load(Ordering::Relaxed) > 1
1124                        || page_cache
1125                            .page_fetches
1126                            .get(&(blob_id, 0))
1127                            .map(|fetch| fetch.waiters == 2)
1128                            .unwrap_or(false)
1129                };
1130                if third_entered {
1131                    break;
1132                }
1133                context.sleep(Duration::from_millis(1)).await;
1134            }
1135
1136            // Let the single underlying fetch complete and satisfy both surviving waiters.
1137            let _ = release_tx.send(());
1138            let second_buf = second.await.expect("second task failed");
1139            let third_buf = third.await.expect("third task failed");
1140            assert_eq!(second_buf, logical_page);
1141            assert_eq!(third_buf, logical_page);
1142
1143            // All waiters should have shared the same blob read.
1144            assert_eq!(reads.load(Ordering::Relaxed), 1);
1145
1146            // The successful fetch should populate the cache for later readers.
1147            let mut cached = vec![0u8; PAGE_SIZE.get() as usize];
1148            assert_eq!(
1149                cache_ref.read_cached(blob_id, &mut cached, 0),
1150                PAGE_SIZE.get() as usize
1151            );
1152            assert_eq!(cached, logical_page);
1153
1154            // A later read should hit the cached page and avoid touching the blob again.
1155            let mut fourth_buf = vec![0u8; PAGE_SIZE.get() as usize];
1156            cache_ref
1157                .read(&blob, blob_id, &mut fourth_buf, 0)
1158                .await
1159                .unwrap();
1160            assert_eq!(fourth_buf, logical_page);
1161            assert_eq!(reads.load(Ordering::Relaxed), 1);
1162
1163            let page_cache = cache_ref.cache.read();
1164            assert!(
1165                !page_cache.page_fetches.contains_key(&(blob_id, 0)),
1166                "completed fetch should leave no stale page_fetches entry"
1167            );
1168        });
1169    }
1170
1171    #[test_traced]
1172    fn test_page_fetch_error_removes_entry_for_all_waiters() {
1173        let executor = deterministic::Runner::default();
1174        executor.start(|context| async move {
1175            let blob_id = 0;
1176            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
1177
1178            // Hold one shared fetch in flight, then make the underlying read fail.
1179            let (started_tx, started_rx) = oneshot::channel();
1180            let (release_tx, release_rx) = oneshot::channel();
1181            let reads = Arc::new(AtomicUsize::new(0));
1182            let blob = ControlledBlob {
1183                started: Arc::new(Mutex::new(Some(started_tx))),
1184                release: Arc::new(Mutex::new(Some(release_rx))),
1185                reads: reads.clone(),
1186                result: ControlledBlobResult::Error,
1187            };
1188
1189            // Start the fetch that creates the in-flight entry.
1190            let mut first_buf = vec![0u8; PAGE_SIZE.get() as usize];
1191            let cache_ref_for_first = cache_ref.clone();
1192            let blob_for_first = blob.clone();
1193            let first = context.child("first").spawn(move |_| async move {
1194                cache_ref_for_first
1195                    .read(&blob_for_first, blob_id, &mut first_buf, 0)
1196                    .await
1197            });
1198            started_rx.await.expect("first erroring read never started");
1199
1200            // Join with a second waiter that should observe the same failure.
1201            let mut second_buf = vec![0u8; PAGE_SIZE.get() as usize];
1202            let cache_ref_for_second = cache_ref.clone();
1203            let blob_for_second = blob.clone();
1204            let second = context.child("second").spawn(move |_| async move {
1205                cache_ref_for_second
1206                    .read(&blob_for_second, blob_id, &mut second_buf, 0)
1207                    .await
1208            });
1209
1210            // Wait until both tasks share the same in-flight fetch entry.
1211            loop {
1212                let joined = {
1213                    let page_cache = cache_ref.cache.read();
1214                    page_cache
1215                        .page_fetches
1216                        .get(&(blob_id, 0))
1217                        .map(|fetch| fetch.waiters == 2)
1218                        .unwrap_or(false)
1219                };
1220                if joined {
1221                    break;
1222                }
1223                context.sleep(Duration::from_millis(1)).await;
1224            }
1225
1226            // Release the blocked read so the shared fetch resolves with an error.
1227            let _ = release_tx.send(());
1228
1229            assert!(matches!(first.await, Ok(Err(Error::ReadFailed))));
1230            assert!(matches!(second.await, Ok(Err(Error::ReadFailed))));
1231            // Both waiters should still have shared a single blob read.
1232            assert_eq!(reads.load(Ordering::Relaxed), 1);
1233
1234            // The failed generation must remove its in-flight entry and avoid caching data.
1235            {
1236                let page_cache = cache_ref.cache.read();
1237                assert!(
1238                    !page_cache.page_fetches.contains_key(&(blob_id, 0)),
1239                    "erroring fetch should leave no stale page_fetches entry"
1240                );
1241            }
1242            let mut cached = vec![0u8; PAGE_SIZE.get() as usize];
1243            assert_eq!(cache_ref.read_cached(blob_id, &mut cached, 0), 0);
1244
1245            // A later read should start a fresh fetch rather than reusing stale error state.
1246            let mut third_buf = vec![0u8; PAGE_SIZE.get() as usize];
1247            assert!(matches!(
1248                cache_ref.read(&blob, blob_id, &mut third_buf, 0).await,
1249                Err(Error::ReadFailed)
1250            ));
1251            assert_eq!(reads.load(Ordering::Relaxed), 2);
1252        });
1253    }
1254
1255    #[test_traced]
1256    fn test_read_cached_many_all_cached() {
1257        let pool = test_pool();
1258        let cache_ref = CacheRef::new(pool, PAGE_SIZE, NZUsize!(10));
1259        let blob_id = cache_ref.next_id();
1260        let page0 = vec![0xAA; PAGE_SIZE.get() as usize];
1261        let page1 = vec![0xBB; PAGE_SIZE.get() as usize];
1262
1263        // Populate two pages with distinct data.
1264        {
1265            let mut cache = cache_ref.cache.write();
1266            cache.cache(blob_id, &page0, 0);
1267            cache.cache(blob_id, &page1, 1);
1268        }
1269
1270        let mut buf0 = vec![0u8; PAGE_SIZE_U64 as usize];
1271        let mut buf1 = vec![0u8; PAGE_SIZE_U64 as usize];
1272        let mut ranges: Vec<(&mut [u8], u64)> = vec![(&mut buf0, 0), (&mut buf1, PAGE_SIZE_U64)];
1273
1274        cache_ref.read_cached_many(blob_id, &mut ranges);
1275
1276        // All ranges served from cache, so the vec is now empty.
1277        assert!(ranges.is_empty());
1278        drop(ranges);
1279
1280        // Buffers should contain the cached page data.
1281        assert!(buf0 == page0);
1282        assert!(buf1 == page1);
1283    }
1284
1285    #[test_traced]
1286    fn test_read_cached_many_none_cached() {
1287        let pool = test_pool();
1288        let cache_ref = CacheRef::new(pool, PAGE_SIZE, NZUsize!(10));
1289        let blob_id = cache_ref.next_id();
1290
1291        let mut buf0 = vec![0u8; PAGE_SIZE_U64 as usize];
1292        let mut buf1 = vec![0u8; PAGE_SIZE_U64 as usize];
1293        let mut ranges: Vec<(&mut [u8], u64)> = vec![(&mut buf0, 0), (&mut buf1, PAGE_SIZE_U64)];
1294
1295        // Empty cache: both ranges should miss and remain in the vec unchanged.
1296        cache_ref.read_cached_many(blob_id, &mut ranges);
1297        assert_eq!(ranges.len(), 2);
1298        assert_eq!(ranges[0].1, 0);
1299        assert_eq!(ranges[1].1, PAGE_SIZE_U64);
1300    }
1301
1302    #[test_traced]
1303    fn test_read_cached_many_scattered_misses() {
1304        // Verify that read_cached_many checks ALL ranges, not just up to the
1305        // first miss. Pages 0 and 2 are cached, page 1 is not.
1306        let pool = test_pool();
1307        let cache_ref = CacheRef::new(pool, PAGE_SIZE, NZUsize!(10));
1308        let blob_id = cache_ref.next_id();
1309
1310        let page0 = vec![0x11; PAGE_SIZE.get() as usize];
1311        let page2 = vec![0x33; PAGE_SIZE.get() as usize];
1312        {
1313            let mut cache = cache_ref.cache.write();
1314            cache.cache(blob_id, &page0, 0);
1315            // page 1 deliberately not cached
1316            cache.cache(blob_id, &page2, 2);
1317        }
1318
1319        let mut buf0 = vec![0u8; PAGE_SIZE_U64 as usize];
1320        let mut buf1 = vec![0u8; PAGE_SIZE_U64 as usize];
1321        let mut buf2 = vec![0u8; PAGE_SIZE_U64 as usize];
1322        let mut ranges: Vec<(&mut [u8], u64)> = vec![
1323            (&mut buf0, 0),
1324            (&mut buf1, PAGE_SIZE_U64),
1325            (&mut buf2, PAGE_SIZE_U64 * 2),
1326        ];
1327
1328        cache_ref.read_cached_many(blob_id, &mut ranges);
1329
1330        // Only the page 1 miss should remain (page 2 is still processed despite
1331        // the earlier miss).
1332        assert_eq!(ranges.len(), 1);
1333        assert_eq!(ranges[0].1, PAGE_SIZE_U64);
1334        drop(ranges);
1335
1336        // Cached pages should have their data written to the buffers.
1337        assert!(buf0 == page0);
1338        assert!(buf2 == page2);
1339        // Missed page's buffer should be untouched (still zeroed).
1340        assert!(buf1.iter().all(|b| *b == 0));
1341    }
1342}