Skip to main content

commonware_runtime/utils/buffer/paged/
cache.rs

1//! A page cache for caching _logical_ pages of [Blob] data in memory. The cache is unaware of the
2//! physical page format used by the blob, which is left to the blob implementation.
3
4use super::get_page_from_blob;
5use crate::{Blob, Error, RwLock};
6use futures::{future::Shared, FutureExt};
7use std::{
8    collections::{hash_map::Entry, HashMap},
9    future::Future,
10    num::{NonZeroU16, NonZeroUsize},
11    pin::Pin,
12    sync::{
13        atomic::{AtomicBool, AtomicU64, Ordering},
14        Arc,
15    },
16};
17use tracing::{debug, error, trace};
18
19// Type alias for the future we'll be storing for each in-flight page fetch.
20//
21// We wrap [Error] in an Arc so it will be cloneable, which is required for the future to be
22// [Shared]. The Vec<u8> contains only the logical (validated) bytes of the page.
23type PageFetchFut = Shared<Pin<Box<dyn Future<Output = Result<Vec<u8>, Arc<Error>>> + Send>>>;
24
25/// A [Cache] caches pages of [Blob] data in memory after verifying the integrity of each.
26///
27/// A single page cache can be used to cache data from multiple blobs by assigning a unique id to
28/// each.
29///
30/// Implements the [Clock](https://en.wikipedia.org/wiki/Page_replacement_algorithm#Clock)
31/// replacement policy, which is a lightweight approximation of LRU. The page `cache` is a circular
32/// list of recently accessed pages, and `clock` is the index of the next page within it to examine
33/// for replacement. When a page needs to be evicted, we start the search at `clock` within `cache`,
34/// searching for the first page with a false reference bit, and setting any skipped page's
35/// reference bit to false along the way.
36struct Cache {
37    /// The page cache index, with a key composed of (blob id, page number), that maps each cached
38    /// page to the index of its slot in `entries` and `arena`.
39    ///
40    /// # Invariants
41    ///
42    /// Each `index` entry maps to exactly one `entries` slot, and that entry always has a
43    /// matching key.
44    index: HashMap<(u64, u64), usize>,
45
46    /// Metadata for each cache slot.
47    ///
48    /// Each `entries` slot has exactly one corresponding `index` entry.
49    entries: Vec<CacheEntry>,
50
51    /// Pre-allocated arena containing all page data contiguously.
52    /// Slot i's data is at `arena[i * page_size .. (i+1) * page_size]`.
53    arena: Vec<u8>,
54
55    /// Size of each page in bytes.
56    page_size: usize,
57
58    /// The Clock replacement policy's clock hand index into `entries`.
59    clock: usize,
60
61    /// The maximum number of pages that will be cached.
62    capacity: usize,
63
64    /// A map of currently executing page fetches to ensure only one task at a time is trying to
65    /// fetch a specific page.
66    page_fetches: HashMap<(u64, u64), PageFetchFut>,
67}
68
69/// Metadata for a single cache entry (page data stored in arena).
70struct CacheEntry {
71    /// The cache key which is composed of the blob id and page number of the page.
72    key: (u64, u64),
73
74    /// A bit indicating whether this page was recently referenced.
75    referenced: AtomicBool,
76}
77
78/// A reference to a page cache that can be shared across threads via cloning, along with the page
79/// size that will be used with it. Provides the API for interacting with the page cache in a
80/// thread-safe manner.
81#[derive(Clone)]
82pub struct CacheRef {
83    /// The size of each page in the underlying blobs managed by this page cache.
84    ///
85    /// # Warning
86    ///
87    /// You cannot change the page size once data has been written without invalidating it. (Reads
88    /// on blobs that were written with a different page size will fail their integrity check.)
89    page_size: u64,
90
91    /// The next id to assign to a blob that will be managed by this cache.
92    next_id: Arc<AtomicU64>,
93
94    /// Shareable reference to the page cache.
95    cache: Arc<RwLock<Cache>>,
96}
97
98impl CacheRef {
99    /// Returns a new [CacheRef] that will buffer up to `capacity` pages with the
100    /// given `page_size`.
101    pub fn new(page_size: NonZeroU16, capacity: NonZeroUsize) -> Self {
102        let page_size_u64 = page_size.get() as u64;
103
104        Self {
105            page_size: page_size_u64,
106            next_id: Arc::new(AtomicU64::new(0)),
107            cache: Arc::new(RwLock::new(Cache::new(page_size, capacity))),
108        }
109    }
110
111    /// The page size used by this page cache.
112    #[inline]
113    pub const fn page_size(&self) -> u64 {
114        self.page_size
115    }
116
117    /// Returns a unique id for the next blob that will use this page cache.
118    pub async fn next_id(&self) -> u64 {
119        self.next_id.fetch_add(1, Ordering::Relaxed)
120    }
121
122    /// Convert a logical offset into the number of the page it belongs to and the offset within
123    /// that page.
124    pub const fn offset_to_page(&self, offset: u64) -> (u64, u64) {
125        Cache::offset_to_page(self.page_size, offset)
126    }
127
128    /// Try to read the specified bytes from the page cache only. Returns the number of bytes
129    /// successfully read from cache and copied to `buf` before a page fault, if any.
130    pub(super) async fn read_cached(
131        &self,
132        blob_id: u64,
133        mut buf: &mut [u8],
134        mut logical_offset: u64,
135    ) -> usize {
136        let original_len = buf.len();
137        let page_cache = self.cache.read().await;
138        while !buf.is_empty() {
139            let count = page_cache.read_at(blob_id, buf, logical_offset);
140            if count == 0 {
141                // Cache miss - return how many bytes we successfully read
142                break;
143            }
144            logical_offset += count as u64;
145            buf = &mut buf[count..];
146        }
147        original_len - buf.len()
148    }
149
150    /// Read the specified bytes, preferentially from the page cache. Bytes not found in the cache
151    /// will be read from the provided `blob` and cached for future reads.
152    pub(super) async fn read<B: Blob>(
153        &self,
154        blob: &B,
155        blob_id: u64,
156        mut buf: &mut [u8],
157        mut offset: u64,
158    ) -> Result<(), Error> {
159        // Read up to a page worth of data at a time from either the page cache or the `blob`,
160        // until the requested data is fully read.
161        while !buf.is_empty() {
162            // Read lock the page cache and see if we can get (some of) the data from it.
163            {
164                let page_cache = self.cache.read().await;
165                let count = page_cache.read_at(blob_id, buf, offset);
166                if count != 0 {
167                    offset += count as u64;
168                    buf = &mut buf[count..];
169                    continue;
170                }
171            }
172
173            // Handle page fault.
174            let count = self
175                .read_after_page_fault(blob, blob_id, buf, offset)
176                .await?;
177            offset += count as u64;
178            buf = &mut buf[count..];
179        }
180
181        Ok(())
182    }
183
184    /// Fetch the requested page after encountering a page fault, which may involve retrieving it
185    /// from `blob` & caching the result in the page cache. Returns the number of bytes read, which
186    /// should always be non-zero.
187    pub(super) async fn read_after_page_fault<B: Blob>(
188        &self,
189        blob: &B,
190        blob_id: u64,
191        buf: &mut [u8],
192        offset: u64,
193    ) -> Result<usize, Error> {
194        assert!(!buf.is_empty());
195
196        let (page_num, offset_in_page) = Cache::offset_to_page(self.page_size, offset);
197        let offset_in_page = offset_in_page as usize;
198        trace!(page_num, blob_id, "page fault");
199
200        // Create or clone a future that retrieves the desired page from the underlying blob. This
201        // requires a write lock on the page cache since we may need to modify `page_fetches` if
202        // this is the first fetcher.
203        let (fetch_future, is_first_fetcher) = {
204            let mut cache = self.cache.write().await;
205
206            // There's a (small) chance the page was fetched & buffered by another task before we
207            // were able to acquire the write lock, so check the cache before doing anything else.
208            let count = cache.read_at(blob_id, buf, offset);
209            if count != 0 {
210                return Ok(count);
211            }
212
213            let entry = cache.page_fetches.entry((blob_id, page_num));
214            match entry {
215                Entry::Occupied(o) => {
216                    // Another thread is already fetching this page, so clone its existing future.
217                    (o.get().clone(), false)
218                }
219                Entry::Vacant(v) => {
220                    // Nobody is currently fetching this page, so create a future that will do the
221                    // work. get_page_from_blob handles CRC validation and returns only logical bytes.
222                    let blob = blob.clone();
223                    let page_size = self.page_size;
224                    let future = async move {
225                        let page = get_page_from_blob(&blob, page_num, page_size)
226                            .await
227                            .map_err(Arc::new)?;
228                        // We should never be fetching partial pages through the page cache. This
229                        // can happen if a non-last page is corrupted and falls back to a partial
230                        // CRC.
231                        let len = page.len();
232                        if len != page_size as usize {
233                            error!(
234                                page_num,
235                                expected = page_size,
236                                actual = len,
237                                "attempted to fetch partial page from blob"
238                            );
239                            return Err(Arc::new(Error::InvalidChecksum));
240                        }
241                        Ok(page)
242                    };
243
244                    // Make the future shareable and insert it into the map.
245                    let shareable = future.boxed().shared();
246                    v.insert(shareable.clone());
247
248                    (shareable, true)
249                }
250            }
251        };
252
253        // Await the future and get the page buffer. If this isn't the task that initiated the
254        // fetch, we can return immediately with the result. Note that we cannot return immediately
255        // on error, since we'd bypass the cleanup required of the first fetcher.
256        let fetch_result = fetch_future.await;
257        if !is_first_fetcher {
258            // Copy the requested portion of the page into the buffer and return immediately.
259            let page_buf = fetch_result.map_err(|_| Error::ReadFailed)?;
260            let bytes_to_copy = std::cmp::min(buf.len(), page_buf.len() - offset_in_page);
261            buf[..bytes_to_copy]
262                .copy_from_slice(&page_buf[offset_in_page..offset_in_page + bytes_to_copy]);
263            return Ok(bytes_to_copy);
264        }
265
266        // This is the task that initiated the fetch, so it is responsible for cleaning up the
267        // inserted entry, and caching the page in the page cache if the fetch didn't error out.
268        // This requires a write lock on the page cache to modify `page_fetches` and cache the page.
269        let mut cache = self.cache.write().await;
270
271        // Remove the entry from `page_fetches`.
272        let _ = cache.page_fetches.remove(&(blob_id, page_num));
273
274        // Cache the result in the page cache. get_page_from_blob already validated the CRC.
275        let page_buf = match fetch_result {
276            Ok(page_buf) => page_buf,
277            Err(err) => {
278                error!(page_num, ?err, "Page fetch failed");
279                return Err(Error::ReadFailed);
280            }
281        };
282
283        cache.cache(blob_id, &page_buf, page_num);
284
285        // Copy the requested portion of the page into the buffer.
286        let bytes_to_copy = std::cmp::min(buf.len(), page_buf.len() - offset_in_page);
287        buf[..bytes_to_copy]
288            .copy_from_slice(&page_buf[offset_in_page..offset_in_page + bytes_to_copy]);
289
290        Ok(bytes_to_copy)
291    }
292
293    /// Cache the provided pages of data in the page cache, returning the remaining bytes that
294    /// didn't fill a whole page. `offset` must be page aligned.
295    ///
296    /// # Panics
297    ///
298    /// - Panics if `offset` is not page aligned.
299    /// - If the buffer is not the size of a page.
300    pub async fn cache(&self, blob_id: u64, mut buf: &[u8], offset: u64) -> usize {
301        let (mut page_num, offset_in_page) = self.offset_to_page(offset);
302        assert_eq!(offset_in_page, 0);
303        {
304            // Write lock the page cache.
305            let page_size = self.page_size as usize;
306            let mut page_cache = self.cache.write().await;
307            while buf.len() >= page_size {
308                page_cache.cache(blob_id, &buf[..page_size], page_num);
309                buf = &buf[page_size..];
310                page_num = match page_num.checked_add(1) {
311                    Some(next) => next,
312                    None => break,
313                };
314            }
315        }
316
317        buf.len()
318    }
319}
320
321impl Cache {
322    /// Return a new empty page cache with an initial next-blob id of 0, and a max cache capacity of
323    /// `capacity` pages, each of size `page_size` bytes.
324    ///
325    /// The arena is pre-allocated to hold all pages contiguously.
326    pub fn new(page_size: NonZeroU16, capacity: NonZeroUsize) -> Self {
327        let page_size = page_size.get() as usize;
328        let capacity = capacity.get();
329        Self {
330            index: HashMap::new(),
331            entries: Vec::with_capacity(capacity),
332            arena: vec![0u8; capacity * page_size],
333            page_size,
334            clock: 0,
335            capacity,
336            page_fetches: HashMap::new(),
337        }
338    }
339
340    /// Returns a slice to the page data for the given slot index.
341    #[inline]
342    fn page_slice(&self, slot: usize) -> &[u8] {
343        assert!(slot < self.capacity);
344        let start = slot * self.page_size;
345        &self.arena[start..start + self.page_size]
346    }
347
348    /// Returns a mutable slice to the page data for the given slot index.
349    #[inline]
350    fn page_slice_mut(&mut self, slot: usize) -> &mut [u8] {
351        assert!(slot < self.capacity);
352        let start = slot * self.page_size;
353        &mut self.arena[start..start + self.page_size]
354    }
355
356    /// Convert an offset into the number of the page it belongs to and the offset within that page.
357    const fn offset_to_page(page_size: u64, offset: u64) -> (u64, u64) {
358        (offset / page_size, offset % page_size)
359    }
360
361    /// Attempt to fetch blob data starting at `offset` from the page cache. Returns the number of
362    /// bytes read, which could be 0 if the first page in the requested range isn't buffered, and is
363    /// never more than `self.page_size` or the length of `buf`. The returned bytes won't cross a
364    /// page boundary, so multiple reads may be required even if all data in the desired range is
365    /// buffered.
366    fn read_at(&self, blob_id: u64, buf: &mut [u8], logical_offset: u64) -> usize {
367        let (page_num, offset_in_page) =
368            Self::offset_to_page(self.page_size as u64, logical_offset);
369        let Some(&slot) = self.index.get(&(blob_id, page_num)) else {
370            return 0;
371        };
372        let entry = &self.entries[slot];
373        assert_eq!(entry.key, (blob_id, page_num));
374        entry.referenced.store(true, Ordering::Relaxed);
375
376        let page = self.page_slice(slot);
377        let bytes_to_copy = std::cmp::min(buf.len(), self.page_size - offset_in_page as usize);
378        buf[..bytes_to_copy].copy_from_slice(
379            &page[offset_in_page as usize..offset_in_page as usize + bytes_to_copy],
380        );
381
382        bytes_to_copy
383    }
384
385    /// Put the given `page` into the page cache.
386    fn cache(&mut self, blob_id: u64, page: &[u8], page_num: u64) {
387        assert_eq!(page.len(), self.page_size);
388        let key = (blob_id, page_num);
389
390        // Check for existing entry (update case)
391        if let Some(&slot) = self.index.get(&key) {
392            // This case can result when a blob is truncated across a page boundary, and later grows
393            // back to (beyond) its original size. It will also become expected behavior once we
394            // allow cached pages to be writable.
395            debug!(blob_id, page_num, "updating duplicate page");
396
397            // Update the stale data with the new page.
398            let entry = &self.entries[slot];
399            assert_eq!(entry.key, key);
400            entry.referenced.store(true, Ordering::Relaxed);
401            self.page_slice_mut(slot).copy_from_slice(page);
402            return;
403        }
404
405        // New entry - check if we need to evict
406        if self.entries.len() < self.capacity {
407            // Still growing: use next available slot
408            let slot = self.entries.len();
409            self.index.insert(key, slot);
410            self.entries.push(CacheEntry {
411                key,
412                referenced: AtomicBool::new(true),
413            });
414            self.page_slice_mut(slot).copy_from_slice(page);
415            return;
416        }
417
418        // Cache full: find slot to evict using Clock algorithm
419        while self.entries[self.clock].referenced.load(Ordering::Relaxed) {
420            self.entries[self.clock]
421                .referenced
422                .store(false, Ordering::Relaxed);
423            self.clock = (self.clock + 1) % self.entries.len();
424        }
425
426        // Evict and replace
427        let slot = self.clock;
428        let entry = &mut self.entries[slot];
429        assert!(self.index.remove(&entry.key).is_some());
430        self.index.insert(key, slot);
431        entry.key = key;
432        entry.referenced.store(true, Ordering::Relaxed);
433        self.page_slice_mut(slot).copy_from_slice(page);
434
435        // Move the clock forward.
436        self.clock = (self.clock + 1) % self.entries.len();
437    }
438}
439
440#[cfg(test)]
441mod tests {
442    use super::{super::Checksum, *};
443    use crate::{buffer::paged::CHECKSUM_SIZE, deterministic, Runner as _, Storage as _};
444    use commonware_cryptography::Crc32;
445    use commonware_macros::test_traced;
446    use commonware_utils::{NZUsize, NZU16};
447    use std::num::NonZeroU16;
448
449    // Logical page size (what CacheRef uses and what gets cached).
450    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
451    const PAGE_SIZE_U64: u64 = PAGE_SIZE.get() as u64;
452
453    #[test_traced]
454    fn test_cache_basic() {
455        let mut cache: Cache = Cache::new(PAGE_SIZE, NZUsize!(10));
456
457        // Cache stores logical-sized pages.
458        let mut buf = vec![0; PAGE_SIZE.get() as usize];
459        let bytes_read = cache.read_at(0, &mut buf, 0);
460        assert_eq!(bytes_read, 0);
461
462        cache.cache(0, &[1; PAGE_SIZE.get() as usize], 0);
463        let bytes_read = cache.read_at(0, &mut buf, 0);
464        assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
465        assert_eq!(buf, [1; PAGE_SIZE.get() as usize]);
466
467        // Test replacement -- should log a duplicate page warning but still work.
468        cache.cache(0, &[2; PAGE_SIZE.get() as usize], 0);
469        let bytes_read = cache.read_at(0, &mut buf, 0);
470        assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
471        assert_eq!(buf, [2; PAGE_SIZE.get() as usize]);
472
473        // Test exceeding the cache capacity.
474        for i in 0u64..11 {
475            cache.cache(0, &[i as u8; PAGE_SIZE.get() as usize], i);
476        }
477        // Page 0 should have been evicted.
478        let bytes_read = cache.read_at(0, &mut buf, 0);
479        assert_eq!(bytes_read, 0);
480        // Page 1-10 should be in the cache.
481        for i in 1u64..11 {
482            let bytes_read = cache.read_at(0, &mut buf, i * PAGE_SIZE_U64);
483            assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
484            assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
485        }
486
487        // Test reading from an unaligned offset by adding 2 to an aligned offset. The read
488        // should be 2 bytes short of a full logical page.
489        let mut buf = vec![0; PAGE_SIZE.get() as usize];
490        let bytes_read = cache.read_at(0, &mut buf, PAGE_SIZE_U64 + 2);
491        assert_eq!(bytes_read, PAGE_SIZE.get() as usize - 2);
492        assert_eq!(
493            &buf[..PAGE_SIZE.get() as usize - 2],
494            [1; PAGE_SIZE.get() as usize - 2]
495        );
496    }
497
498    #[test_traced]
499    fn test_cache_read_with_blob() {
500        // Initialize the deterministic context
501        let executor = deterministic::Runner::default();
502        // Start the test within the executor
503        executor.start(|context| async move {
504            // Physical page size = logical + CRC record.
505            let physical_page_size = PAGE_SIZE_U64 + CHECKSUM_SIZE;
506
507            // Populate a blob with 11 consecutive pages of CRC-protected data.
508            let (blob, size) = context
509                .open("test", "blob".as_bytes())
510                .await
511                .expect("Failed to open blob");
512            assert_eq!(size, 0);
513            for i in 0..11 {
514                // Write logical data followed by Checksum.
515                let logical_data = vec![i as u8; PAGE_SIZE.get() as usize];
516                let crc = Crc32::checksum(&logical_data);
517                let record = Checksum::new(PAGE_SIZE.get(), crc);
518                let mut page_data = logical_data;
519                page_data.extend_from_slice(&record.to_bytes());
520                blob.write_at(i * physical_page_size, page_data)
521                    .await
522                    .unwrap();
523            }
524
525            // Fill the page cache with the blob's data via CacheRef::read.
526            let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(10));
527            assert_eq!(cache_ref.next_id().await, 0);
528            assert_eq!(cache_ref.next_id().await, 1);
529            for i in 0..11 {
530                // Read expects logical bytes only (CRCs are stripped).
531                let mut buf = vec![0; PAGE_SIZE.get() as usize];
532                cache_ref
533                    .read(&blob, 0, &mut buf, i * PAGE_SIZE_U64)
534                    .await
535                    .unwrap();
536                assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
537            }
538
539            // Repeat the read to exercise reading from the page cache. Must start at 1 because
540            // page 0 should be evicted.
541            for i in 1..11 {
542                let mut buf = vec![0; PAGE_SIZE.get() as usize];
543                cache_ref
544                    .read(&blob, 0, &mut buf, i * PAGE_SIZE_U64)
545                    .await
546                    .unwrap();
547                assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
548            }
549
550            // Cleanup.
551            blob.sync().await.unwrap();
552        });
553    }
554
555    #[test_traced]
556    fn test_cache_max_page() {
557        let executor = deterministic::Runner::default();
558        executor.start(|_context| async move {
559            let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(2));
560
561            // Use the largest page-aligned offset representable for the configured PAGE_SIZE.
562            let aligned_max_offset = u64::MAX - (u64::MAX % PAGE_SIZE_U64);
563
564            // CacheRef::cache expects only logical bytes (no CRC).
565            let logical_data = vec![42u8; PAGE_SIZE.get() as usize];
566
567            // Caching exactly one page at the maximum offset should succeed.
568            let remaining = cache_ref
569                .cache(0, logical_data.as_slice(), aligned_max_offset)
570                .await;
571            assert_eq!(remaining, 0);
572
573            // Reading from the cache should return the logical bytes.
574            let mut buf = vec![0u8; PAGE_SIZE.get() as usize];
575            let page_cache = cache_ref.cache.read().await;
576            let bytes_read = page_cache.read_at(0, &mut buf, aligned_max_offset);
577            assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
578            assert!(buf.iter().all(|b| *b == 42));
579        });
580    }
581
582    #[test_traced]
583    fn test_cache_at_high_offset() {
584        let executor = deterministic::Runner::default();
585        executor.start(|_context| async move {
586            // Use the minimum page size (CHECKSUM_SIZE + 1 = 13) with high offset.
587            const MIN_PAGE_SIZE: u64 = CHECKSUM_SIZE + 1;
588            let cache_ref = CacheRef::new(NZU16!(MIN_PAGE_SIZE as u16), NZUsize!(2));
589
590            // Create two pages worth of logical data (no CRCs - CacheRef::cache expects logical
591            // only).
592            let data = vec![1u8; MIN_PAGE_SIZE as usize * 2];
593
594            // Cache pages at a high (but not max) aligned offset so we can verify both pages.
595            // Use an offset that's a few pages below max to avoid overflow when verifying.
596            let aligned_max_offset = u64::MAX - (u64::MAX % MIN_PAGE_SIZE);
597            let high_offset = aligned_max_offset - (MIN_PAGE_SIZE * 2);
598            let remaining = cache_ref.cache(0, &data, high_offset).await;
599            // Both pages should be cached.
600            assert_eq!(remaining, 0);
601
602            // Verify the first page was cached correctly.
603            let mut buf = vec![0u8; MIN_PAGE_SIZE as usize];
604            let page_cache = cache_ref.cache.read().await;
605            assert_eq!(
606                page_cache.read_at(0, &mut buf, high_offset),
607                MIN_PAGE_SIZE as usize
608            );
609            assert!(buf.iter().all(|b| *b == 1));
610
611            // Verify the second page was cached correctly.
612            assert_eq!(
613                page_cache.read_at(0, &mut buf, high_offset + MIN_PAGE_SIZE),
614                MIN_PAGE_SIZE as usize
615            );
616            assert!(buf.iter().all(|b| *b == 1));
617        });
618    }
619}