commonware_runtime/utils/buffer/pool/
page_cache.rs

1//! A buffer pool for caching _logical_ pages of [Blob] data in memory. The buffer pool is unaware
2//! of the physical page format used by the blob, which is left to the blob implementation.
3
4use super::get_page_from_blob;
5use crate::{Blob, Error, RwLock};
6use commonware_utils::StableBuf;
7use futures::{future::Shared, FutureExt};
8use std::{
9    collections::{hash_map::Entry, HashMap},
10    future::Future,
11    num::{NonZeroU16, NonZeroUsize},
12    pin::Pin,
13    sync::{
14        atomic::{AtomicBool, AtomicU64, Ordering},
15        Arc,
16    },
17};
18use tracing::{debug, error, trace};
19
20// Type alias for the future we'll be storing for each in-flight page fetch.
21//
22// We wrap [Error] in an Arc so it will be cloneable, which is required for the future to be
23// [Shared]. The StableBuf contains only the logical (validated) bytes of the page.
24type PageFetchFut = Shared<Pin<Box<dyn Future<Output = Result<StableBuf, Arc<Error>>> + Send>>>;
25
26/// A [Pool] caches pages of [Blob] data in memory after verifying the integrity of each.
27///
28/// A single buffer pool can be used to cache data from multiple blobs by assigning a unique id to
29/// each.
30///
31/// Implements the [Clock](https://en.wikipedia.org/wiki/Page_replacement_algorithm#Clock)
32/// replacement policy, which is a lightweight approximation of LRU. The page `cache` is a circular
33/// list of recently accessed pages, and `clock` is the index of the next page within it to examine
34/// for replacement. When a page needs to be evicted, we start the search at `clock` within `cache`,
35/// searching for the first page with a false reference bit, and setting any skipped page's
36/// reference bit to false along the way.
37pub struct Pool {
38    /// The page cache index, with a key composed of (blob id, page number), that maps each cached
39    /// page to the index of its `cache` entry.
40    ///
41    /// # Invariants
42    ///
43    /// Each `index` entry maps to exactly one `cache` entry, and that cache entry always has a
44    /// matching key.
45    index: HashMap<(u64, u64), usize>,
46
47    /// The page cache.
48    ///
49    /// Each `cache` entry has exactly one corresponding `index` entry.
50    cache: Vec<CacheEntry>,
51
52    /// The Clock replacement policy's clock hand index into `cache`.
53    clock: usize,
54
55    /// The maximum number of pages that will be cached.
56    capacity: usize,
57
58    /// A map of currently executing page fetches to ensure only one task at a time is trying to
59    /// fetch a specific page.
60    page_fetches: HashMap<(u64, u64), PageFetchFut>,
61}
62
63struct CacheEntry {
64    /// The cache key which is composed of the blob id and page number of the page.
65    key: (u64, u64),
66
67    /// A bit indicating whether this page was recently referenced.
68    referenced: AtomicBool,
69
70    /// The cached page itself. Only logical bytes are cached, so the vector will be 12 bytes shorter
71    /// than the physical page size.
72    data: Vec<u8>,
73}
74
75/// A reference to a page cache that can be shared across threads via cloning, along with the page
76/// size that will be used with it. Provides the API for interacting with the buffer pool in a
77/// thread-safe manner.
78#[derive(Clone)]
79pub struct PoolRef {
80    /// The size of each page in the underlying blobs managed by this buffer pool.
81    ///
82    /// # Warning
83    ///
84    /// You cannot change the page size once data has been written without invalidating it. (Reads
85    /// on blobs that were written with a different page size will fail their integrity check.)
86    page_size: u64,
87
88    /// The next id to assign to a blob that will be managed by this pool.
89    next_id: Arc<AtomicU64>,
90
91    /// Shareable reference to the buffer pool.
92    pool: Arc<RwLock<Pool>>,
93}
94
95impl PoolRef {
96    /// Returns a new [PoolRef] that will buffer up to `capacity` pages with the
97    /// given `page_size`.
98    pub fn new(page_size: NonZeroU16, capacity: NonZeroUsize) -> Self {
99        let page_size = page_size.get() as u64;
100
101        Self {
102            page_size,
103            next_id: Arc::new(AtomicU64::new(0)),
104            pool: Arc::new(RwLock::new(Pool::new(capacity.get()))),
105        }
106    }
107
108    /// The page size used by this buffer pool.
109    #[inline]
110    pub const fn page_size(&self) -> u64 {
111        self.page_size
112    }
113
114    /// Returns a unique id for the next blob that will use this buffer pool.
115    pub async fn next_id(&self) -> u64 {
116        self.next_id.fetch_add(1, Ordering::Relaxed)
117    }
118
119    /// Convert a logical offset into the number of the page it belongs to and the offset within
120    /// that page.
121    pub const fn offset_to_page(&self, offset: u64) -> (u64, u64) {
122        Pool::offset_to_page(self.page_size, offset)
123    }
124
125    /// Try to read the specified bytes from the buffer pool cache only. Returns the number of
126    /// bytes successfully read from cache and copied to `buf` before a page fault, if any.
127    pub(super) async fn read_cached(
128        &self,
129        blob_id: u64,
130        mut buf: &mut [u8],
131        mut logical_offset: u64,
132    ) -> usize {
133        let original_len = buf.len();
134        let buffer_pool = self.pool.read().await;
135        while !buf.is_empty() {
136            let count = buffer_pool.read_at(self.page_size, blob_id, buf, logical_offset);
137            if count == 0 {
138                // Cache miss - return how many bytes we successfully read
139                break;
140            }
141            logical_offset += count as u64;
142            buf = &mut buf[count..];
143        }
144        original_len - buf.len()
145    }
146
147    /// Read the specified bytes, preferentially from the buffer pool cache. Bytes not found in the
148    /// buffer pool will be read from the provided `blob` and cached for future reads.
149    pub(super) async fn read<B: Blob>(
150        &self,
151        blob: &B,
152        blob_id: u64,
153        mut buf: &mut [u8],
154        mut offset: u64,
155    ) -> Result<(), Error> {
156        // Read up to a page worth of data at a time from either the buffer pool or the `blob`,
157        // until the requested data is fully read.
158        while !buf.is_empty() {
159            // Read lock the buffer pool and see if we can get (some of) the data from it.
160            {
161                let buffer_pool = self.pool.read().await;
162                let count = buffer_pool.read_at(self.page_size, blob_id, buf, offset);
163                if count != 0 {
164                    offset += count as u64;
165                    buf = &mut buf[count..];
166                    continue;
167                }
168            }
169
170            // Handle page fault.
171            let count = self
172                .read_after_page_fault(blob, blob_id, buf, offset)
173                .await?;
174            offset += count as u64;
175            buf = &mut buf[count..];
176        }
177
178        Ok(())
179    }
180
181    /// Fetch the requested page after encountering a page fault, which may involve retrieving it
182    /// from `blob` & caching the result in `pool`. Returns the number of bytes read, which should
183    /// always be non-zero.
184    pub(super) async fn read_after_page_fault<B: Blob>(
185        &self,
186        blob: &B,
187        blob_id: u64,
188        buf: &mut [u8],
189        offset: u64,
190    ) -> Result<usize, Error> {
191        assert!(!buf.is_empty());
192
193        let (page_num, offset_in_page) = Pool::offset_to_page(self.page_size, offset);
194        let offset_in_page = offset_in_page as usize;
195        trace!(page_num, blob_id, "page fault");
196
197        // Create or clone a future that retrieves the desired page from the underlying blob. This
198        // requires a write lock on the buffer pool since we may need to modify `page_fetches` if
199        // this is the first fetcher.
200        let (fetch_future, is_first_fetcher) = {
201            let mut pool = self.pool.write().await;
202
203            // There's a (small) chance the page was fetched & buffered by another task before we
204            // were able to acquire the write lock, so check the cache before doing anything else.
205            let count = pool.read_at(self.page_size, blob_id, buf, offset);
206            if count != 0 {
207                return Ok(count);
208            }
209
210            let entry = pool.page_fetches.entry((blob_id, page_num));
211            match entry {
212                Entry::Occupied(o) => {
213                    // Another thread is already fetching this page, so clone its existing future.
214                    (o.get().clone(), false)
215                }
216                Entry::Vacant(v) => {
217                    // Nobody is currently fetching this page, so create a future that will do the
218                    // work. get_page_from_blob handles CRC validation and returns only logical bytes.
219                    let blob = blob.clone();
220                    let page_size = self.page_size;
221                    let future = async move {
222                        let page = get_page_from_blob(&blob, page_num, page_size)
223                            .await
224                            .map_err(Arc::new)?;
225                        // We should never be fetching partial pages through the buffer pool. This can happen
226                        // if a non-last page is corrupted and falls back to a partial CRC.
227                        let len = page.as_ref().len();
228                        if len != page_size as usize {
229                            error!(
230                                page_num,
231                                expected = page_size,
232                                actual = len,
233                                "attempted to fetch partial page from blob"
234                            );
235                            return Err(Arc::new(Error::InvalidChecksum));
236                        }
237                        Ok(page)
238                    };
239
240                    // Make the future shareable and insert it into the map.
241                    let shareable = future.boxed().shared();
242                    v.insert(shareable.clone());
243
244                    (shareable, true)
245                }
246            }
247        };
248
249        // Await the future and get the page buffer. If this isn't the task that initiated the
250        // fetch, we can return immediately with the result. Note that we cannot return immediately
251        // on error, since we'd bypass the cleanup required of the first fetcher.
252        let fetch_result = fetch_future.await;
253        if !is_first_fetcher {
254            // Copy the requested portion of the page into the buffer and return immediately.
255            let page_buf = fetch_result.map_err(|_| Error::ReadFailed)?;
256            let bytes_to_copy = std::cmp::min(buf.len(), page_buf.as_ref().len() - offset_in_page);
257            buf[..bytes_to_copy].copy_from_slice(
258                &page_buf.as_ref()[offset_in_page..offset_in_page + bytes_to_copy],
259            );
260            return Ok(bytes_to_copy);
261        }
262
263        // This is the task that initiated the fetch, so it is responsible for cleaning up the
264        // inserted entry, and caching the page in the buffer pool if the fetch didn't error out.
265        // This requires a write lock on the buffer pool to modify `page_fetches` and cache the
266        // page.
267        let mut pool = self.pool.write().await;
268
269        // Remove the entry from `page_fetches`.
270        let _ = pool.page_fetches.remove(&(blob_id, page_num));
271
272        // Cache the result in the buffer pool. get_page_from_blob already validated the CRC.
273        let page_buf = match fetch_result {
274            Ok(page_buf) => page_buf,
275            Err(err) => {
276                error!(page_num, ?err, "Page fetch failed");
277                return Err(Error::ReadFailed);
278            }
279        };
280
281        pool.cache(self.page_size, blob_id, page_buf.as_ref(), page_num);
282
283        // Copy the requested portion of the page into the buffer.
284        let bytes_to_copy = std::cmp::min(buf.len(), page_buf.as_ref().len() - offset_in_page);
285        buf[..bytes_to_copy]
286            .copy_from_slice(&page_buf.as_ref()[offset_in_page..offset_in_page + bytes_to_copy]);
287
288        Ok(bytes_to_copy)
289    }
290
291    /// Cache the provided pages of data in the buffer pool, returning the remaining bytes that
292    /// didn't fill a whole page. `offset` must be page aligned.
293    ///
294    /// # Panics
295    ///
296    /// - Panics if `offset` is not page aligned.
297    /// - If the buffer is not the size of a page.
298    pub async fn cache(&self, blob_id: u64, mut buf: &[u8], offset: u64) -> usize {
299        let (mut page_num, offset_in_page) = self.offset_to_page(offset);
300        assert_eq!(offset_in_page, 0);
301        {
302            // Write lock the buffer pool.
303            let page_size = self.page_size as usize;
304            let mut buffer_pool = self.pool.write().await;
305            while buf.len() >= page_size {
306                buffer_pool.cache(self.page_size, blob_id, &buf[..page_size], page_num);
307                buf = &buf[page_size..];
308                page_num = match page_num.checked_add(1) {
309                    Some(next) => next,
310                    None => break,
311                };
312            }
313        }
314
315        buf.len()
316    }
317}
318
319impl Pool {
320    /// Return a new empty buffer pool with an initial next-blob id of 0, and a max cache capacity
321    /// of `capacity` pages.
322    ///
323    /// # Panics
324    ///
325    /// Panics if `capacity` is 0.
326    pub fn new(capacity: usize) -> Self {
327        assert!(capacity > 0);
328        Self {
329            index: HashMap::new(),
330            cache: Vec::new(),
331            clock: 0,
332            capacity,
333            page_fetches: HashMap::new(),
334        }
335    }
336
337    /// Convert an offset into the number of the page it belongs to and the offset within that page.
338    const fn offset_to_page(page_size: u64, offset: u64) -> (u64, u64) {
339        (offset / page_size, offset % page_size)
340    }
341
342    /// Attempt to fetch blob data starting at `offset` from the buffer pool. Returns the number of
343    /// bytes read, which could be 0 if the first page in the requested range isn't buffered, and is
344    /// never more than `self.page_size` or the length of `buf`. The returned bytes won't cross a
345    /// page boundary, so multiple reads may be required even if all data in the desired range is
346    /// buffered.
347    fn read_at(&self, page_size: u64, blob_id: u64, buf: &mut [u8], logical_offset: u64) -> usize {
348        let (page_num, offset_in_page) = Self::offset_to_page(page_size, logical_offset);
349        let page_index = self.index.get(&(blob_id, page_num));
350        let Some(&page_index) = page_index else {
351            return 0;
352        };
353        let page = &self.cache[page_index];
354        assert_eq!(page.key, (blob_id, page_num));
355        page.referenced.store(true, Ordering::Relaxed);
356        let page = &page.data;
357
358        let logical_page_size = page_size as usize;
359        let bytes_to_copy = std::cmp::min(buf.len(), logical_page_size - offset_in_page as usize);
360        buf[..bytes_to_copy].copy_from_slice(
361            &page[offset_in_page as usize..offset_in_page as usize + bytes_to_copy],
362        );
363
364        bytes_to_copy
365    }
366
367    /// Put the given `page` into the buffer pool.
368    fn cache(&mut self, page_size: u64, blob_id: u64, page: &[u8], page_num: u64) {
369        assert_eq!(page.len(), page_size as usize);
370        let key = (blob_id, page_num);
371        let index_entry = self.index.entry(key);
372        if let Entry::Occupied(index_entry) = index_entry {
373            // This case can result when a blob is truncated across a page boundary, and later grows
374            // back to (beyond) its original size. It will also become expected behavior once we
375            // allow cached pages to be writable.
376            debug!(blob_id, page_num, "updating duplicate page");
377
378            // Update the stale data with the new page.
379            let entry = &mut self.cache[*index_entry.get()];
380            assert_eq!(entry.key, key);
381            entry.referenced.store(true, Ordering::Relaxed);
382            entry.data.copy_from_slice(page);
383            return;
384        }
385
386        if self.cache.len() < self.capacity {
387            self.index.insert(key, self.cache.len());
388            self.cache.push(CacheEntry {
389                key,
390                referenced: AtomicBool::new(true),
391                data: page.into(),
392            });
393            return;
394        }
395
396        // Cache is full, find a page to evict.
397        while self.cache[self.clock].referenced.load(Ordering::Relaxed) {
398            self.cache[self.clock]
399                .referenced
400                .store(false, Ordering::Relaxed);
401            self.clock = (self.clock + 1) % self.cache.len();
402        }
403
404        // Evict the page by replacing it with the new page.
405        let entry = &mut self.cache[self.clock];
406        entry.referenced.store(true, Ordering::Relaxed);
407        assert!(self.index.remove(&entry.key).is_some());
408        self.index.insert(key, self.clock);
409        entry.key = key;
410        entry.data.copy_from_slice(page);
411
412        // Move the clock forward.
413        self.clock = (self.clock + 1) % self.cache.len();
414    }
415}
416
417#[cfg(test)]
418mod tests {
419    use super::{super::Checksum, *};
420    use crate::{buffer::pool::CHECKSUM_SIZE, deterministic, Runner as _, Storage as _};
421    use commonware_cryptography::Crc32;
422    use commonware_macros::test_traced;
423    use commonware_utils::{NZUsize, NZU16};
424    use std::num::NonZeroU16;
425
426    // Logical page size (what PoolRef uses and what gets cached).
427    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
428    const PAGE_SIZE_U64: u64 = PAGE_SIZE.get() as u64;
429
430    #[test_traced]
431    fn test_pool_basic() {
432        let mut pool: Pool = Pool::new(10);
433
434        // Cache stores logical-sized pages.
435        let mut buf = vec![0; PAGE_SIZE.get() as usize];
436        let bytes_read = pool.read_at(PAGE_SIZE_U64, 0, &mut buf, 0);
437        assert_eq!(bytes_read, 0);
438
439        pool.cache(PAGE_SIZE_U64, 0, &[1; PAGE_SIZE.get() as usize], 0);
440        let bytes_read = pool.read_at(PAGE_SIZE_U64, 0, &mut buf, 0);
441        assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
442        assert_eq!(buf, [1; PAGE_SIZE.get() as usize]);
443
444        // Test replacement -- should log a duplicate page warning but still work.
445        pool.cache(PAGE_SIZE_U64, 0, &[2; PAGE_SIZE.get() as usize], 0);
446        let bytes_read = pool.read_at(PAGE_SIZE_U64, 0, &mut buf, 0);
447        assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
448        assert_eq!(buf, [2; PAGE_SIZE.get() as usize]);
449
450        // Test exceeding the cache capacity.
451        for i in 0u64..11 {
452            pool.cache(PAGE_SIZE_U64, 0, &[i as u8; PAGE_SIZE.get() as usize], i);
453        }
454        // Page 0 should have been evicted.
455        let bytes_read = pool.read_at(PAGE_SIZE_U64, 0, &mut buf, 0);
456        assert_eq!(bytes_read, 0);
457        // Page 1-10 should be in the cache.
458        for i in 1u64..11 {
459            let bytes_read = pool.read_at(PAGE_SIZE_U64, 0, &mut buf, i * PAGE_SIZE_U64);
460            assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
461            assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
462        }
463
464        // Test reading from an unaligned offset by adding 2 to an aligned offset. The read
465        // should be 2 bytes short of a full logical page.
466        let mut buf = vec![0; PAGE_SIZE.get() as usize];
467        let bytes_read = pool.read_at(PAGE_SIZE_U64, 0, &mut buf, PAGE_SIZE_U64 + 2);
468        assert_eq!(bytes_read, PAGE_SIZE.get() as usize - 2);
469        assert_eq!(
470            &buf[..PAGE_SIZE.get() as usize - 2],
471            [1; PAGE_SIZE.get() as usize - 2]
472        );
473    }
474
475    #[test_traced]
476    fn test_pool_read_with_blob() {
477        // Initialize the deterministic context
478        let executor = deterministic::Runner::default();
479        // Start the test within the executor
480        executor.start(|context| async move {
481            // Physical page size = logical + CRC record.
482            let physical_page_size = PAGE_SIZE_U64 + CHECKSUM_SIZE;
483
484            // Populate a blob with 11 consecutive pages of CRC-protected data.
485            let (blob, size) = context
486                .open("test", "blob".as_bytes())
487                .await
488                .expect("Failed to open blob");
489            assert_eq!(size, 0);
490            for i in 0..11 {
491                // Write logical data followed by Checksum.
492                let logical_data = vec![i as u8; PAGE_SIZE.get() as usize];
493                let crc = Crc32::checksum(&logical_data);
494                let record = Checksum::new(PAGE_SIZE.get(), crc);
495                let mut page_data = logical_data;
496                page_data.extend_from_slice(&record.to_bytes());
497                blob.write_at(page_data, i * physical_page_size)
498                    .await
499                    .unwrap();
500            }
501
502            // Fill the buffer pool with the blob's data via PoolRef::read.
503            let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(10));
504            assert_eq!(pool_ref.next_id().await, 0);
505            assert_eq!(pool_ref.next_id().await, 1);
506            for i in 0..11 {
507                // Read expects logical bytes only (CRCs are stripped).
508                let mut buf = vec![0; PAGE_SIZE.get() as usize];
509                pool_ref
510                    .read(&blob, 0, &mut buf, i * PAGE_SIZE_U64)
511                    .await
512                    .unwrap();
513                assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
514            }
515
516            // Repeat the read to exercise reading from the buffer pool. Must start at 1 because
517            // page 0 should be evicted.
518            for i in 1..11 {
519                let mut buf = vec![0; PAGE_SIZE.get() as usize];
520                pool_ref
521                    .read(&blob, 0, &mut buf, i * PAGE_SIZE_U64)
522                    .await
523                    .unwrap();
524                assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
525            }
526
527            // Cleanup.
528            blob.sync().await.unwrap();
529        });
530    }
531
532    #[test_traced]
533    fn test_pool_cache_max_page() {
534        let executor = deterministic::Runner::default();
535        executor.start(|_context| async move {
536            let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(2));
537
538            // Use the largest page-aligned offset representable for the configured PAGE_SIZE.
539            let aligned_max_offset = u64::MAX - (u64::MAX % PAGE_SIZE_U64);
540
541            // PoolRef::cache expects only logical bytes (no CRC).
542            let logical_data = vec![42u8; PAGE_SIZE.get() as usize];
543
544            // Caching exactly one page at the maximum offset should succeed.
545            let remaining = pool_ref
546                .cache(0, logical_data.as_slice(), aligned_max_offset)
547                .await;
548            assert_eq!(remaining, 0);
549
550            // Reading from the pool should return the logical bytes.
551            let mut buf = vec![0u8; PAGE_SIZE.get() as usize];
552            let pool = pool_ref.pool.read().await;
553            let bytes_read = pool.read_at(PAGE_SIZE_U64, 0, &mut buf, aligned_max_offset);
554            assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
555            assert!(buf.iter().all(|b| *b == 42));
556        });
557    }
558
559    #[test_traced]
560    fn test_pool_cache_at_high_offset() {
561        let executor = deterministic::Runner::default();
562        executor.start(|_context| async move {
563            // Use the minimum page size (CHECKSUM_SIZE + 1 = 13) with high offset.
564            const MIN_PAGE_SIZE: u64 = CHECKSUM_SIZE + 1;
565            let pool_ref = PoolRef::new(NZU16!(MIN_PAGE_SIZE as u16), NZUsize!(2));
566
567            // Create two pages worth of logical data (no CRCs - PoolRef::cache expects logical only).
568            let data = vec![1u8; MIN_PAGE_SIZE as usize * 2];
569
570            // Cache pages at a high (but not max) aligned offset so we can verify both pages.
571            // Use an offset that's a few pages below max to avoid overflow when verifying.
572            let aligned_max_offset = u64::MAX - (u64::MAX % MIN_PAGE_SIZE);
573            let high_offset = aligned_max_offset - (MIN_PAGE_SIZE * 2);
574            let remaining = pool_ref.cache(0, &data, high_offset).await;
575            // Both pages should be cached.
576            assert_eq!(remaining, 0);
577
578            // Verify the first page was cached correctly.
579            let mut buf = vec![0u8; MIN_PAGE_SIZE as usize];
580            let pool = pool_ref.pool.read().await;
581            assert_eq!(
582                pool.read_at(MIN_PAGE_SIZE, 0, &mut buf, high_offset),
583                MIN_PAGE_SIZE as usize
584            );
585            assert!(buf.iter().all(|b| *b == 1));
586
587            // Verify the second page was cached correctly.
588            assert_eq!(
589                pool.read_at(MIN_PAGE_SIZE, 0, &mut buf, high_offset + MIN_PAGE_SIZE),
590                MIN_PAGE_SIZE as usize
591            );
592            assert!(buf.iter().all(|b| *b == 1));
593        });
594    }
595}