commonware_runtime/utils/buffer/
pool.rs

1use crate::{Blob, Error, RwLock};
2use commonware_utils::StableBuf;
3use futures::{future::Shared, FutureExt};
4use std::{
5    collections::{hash_map::Entry, HashMap},
6    future::Future,
7    num::NonZeroUsize,
8    pin::Pin,
9    sync::{
10        atomic::{AtomicBool, AtomicU64, Ordering},
11        Arc,
12    },
13};
14use tracing::{debug, trace};
15
16// Type alias for the future we'll be storing for each in-flight page fetch.
17//
18// We wrap [Error] in an Arc so it will be cloneable, which is required for the future to be
19// [Shared].
20type PageFetchFut = Shared<Pin<Box<dyn Future<Output = Result<StableBuf, Arc<Error>>> + Send>>>;
21
22/// A [Pool] caches pages of [Blob] data in memory.
23///
24/// A single buffer pool can be used to cache data from multiple blobs by assigning a unique id to
25/// each.
26///
27/// Implements the [Clock](https://en.wikipedia.org/wiki/Page_replacement_algorithm#Clock)
28/// replacement policy, which is a lightweight approximation of LRU. The page `cache` is a circular
29/// list of recently accessed pages, and `clock` is the index of the next page within it to examine
30/// for replacement. When a page needs to be evicted, we start the search at `clock` within `cache`,
31/// searching for the first page with a false reference bit, and setting any skipped page's
32/// reference bit to false along the way.
33pub struct Pool {
34    /// The page cache index, with a key composed of (blob id, page number), that maps each cached
35    /// page to the index of its `cache` entry.
36    ///
37    /// # Invariants
38    ///
39    /// Each `index` entry maps to exactly one `cache` entry, and that cache entry always has a
40    /// matching key.
41    index: HashMap<(u64, u64), usize>,
42
43    /// The page cache.
44    ///
45    /// Each `cache` entry has exactly one corresponding `index` entry.
46    cache: Vec<CacheEntry>,
47
48    /// The Clock replacement policy's clock hand index into `cache`.
49    clock: usize,
50
51    /// The maximum number of pages that will be cached.
52    capacity: usize,
53
54    /// A map of currently executing page fetches to ensure only one task at a time is trying to
55    /// fetch a specific page.
56    page_fetches: HashMap<(u64, u64), PageFetchFut>,
57}
58
59struct CacheEntry {
60    /// The cache key which is composed of the blob id and page number of the page.
61    key: (u64, u64),
62
63    /// A bit indicating whether this page was recently referenced.
64    referenced: AtomicBool,
65
66    /// The cached page itself.
67    data: Vec<u8>,
68}
69
70/// A reference to a [Pool] that can be shared across threads via cloning, along with the page size
71/// that will be used with it. Provides the API for interacting with the buffer pool in a
72/// thread-safe manner.
73#[derive(Clone)]
74pub struct PoolRef {
75    /// The size of each page in the buffer pool.
76    pub(super) page_size: usize,
77
78    /// The next id to assign to a blob that will be managed by this pool.
79    next_id: Arc<AtomicU64>,
80
81    /// Shareable reference to the buffer pool.
82    pool: Arc<RwLock<Pool>>,
83}
84
85impl PoolRef {
86    /// Returns a new [PoolRef] with the given `page_size` and `capacity`.
87    pub fn new(page_size: NonZeroUsize, capacity: NonZeroUsize) -> Self {
88        Self {
89            page_size: page_size.get(),
90            next_id: Arc::new(AtomicU64::new(0)),
91            pool: Arc::new(RwLock::new(Pool::new(capacity.get()))),
92        }
93    }
94
95    /// Returns a unique id for the next blob that will use this buffer pool.
96    pub async fn next_id(&self) -> u64 {
97        self.next_id.fetch_add(1, Ordering::Relaxed)
98    }
99
100    /// Convert an offset into the number of the page it belongs to and the offset within that page.
101    pub const fn offset_to_page(&self, offset: u64) -> (u64, usize) {
102        Pool::offset_to_page(self.page_size, offset)
103    }
104
105    /// Read the specified bytes, preferentially from the buffer pool cache. Bytes not found in the
106    /// buffer pool will be read from the provided `blob` and cached for future reads.
107    ///
108    /// # Warning
109    ///
110    /// Attempts to read any of the last (blob_size % page_size) "trailing bytes" of the blob will
111    /// result in a ReadFailed error since the buffer pool only deals with page sized chunks.
112    /// Trailing bytes need to be dealt with outside of the buffer pool. For example,
113    /// [crate::buffer::Append] uses a [crate::buffer::tip::Buffer] to buffer them.
114    pub(super) async fn read<B: Blob>(
115        &self,
116        blob: &B,
117        blob_id: u64,
118        mut buf: &mut [u8],
119        mut offset: u64,
120    ) -> Result<(), Error> {
121        // Read up to a page worth of data at a time from either the buffer pool or the `blob`,
122        // until the requested data is fully read.
123        while !buf.is_empty() {
124            // Read lock the buffer pool and see if we can get (some of) the data from it.
125            {
126                let buffer_pool = self.pool.read().await;
127                let count = buffer_pool.read_at(self.page_size, blob_id, buf, offset);
128                if count != 0 {
129                    offset += count as u64;
130                    buf = &mut buf[count..];
131                    continue;
132                }
133            }
134
135            // Handle page fault.
136            let count = self
137                .read_after_page_fault(blob, blob_id, buf, offset)
138                .await?;
139            offset += count as u64;
140            buf = &mut buf[count..];
141        }
142
143        Ok(())
144    }
145
146    /// Fetch the specified page after encountering a page fault, which may involve retrieving it
147    /// from `blob` & caching the result in `pool`. Returns the number of bytes read, which should
148    /// always be non-zero.
149    async fn read_after_page_fault<B: Blob>(
150        &self,
151        blob: &B,
152        blob_id: u64,
153        buf: &mut [u8],
154        offset: u64,
155    ) -> Result<usize, Error> {
156        assert!(!buf.is_empty());
157
158        let (page_num, offset_in_page) = Pool::offset_to_page(self.page_size, offset);
159        let page_size = self.page_size;
160        trace!(page_num, blob_id, "page fault");
161
162        // Create or clone a future that retrieves the desired page from the underlying blob. This
163        // requires a write lock on the buffer pool since we may need to modify `page_fetches` if
164        // this is the first fetcher.
165        let (fetch_future, is_first_fetcher) = {
166            let mut pool = self.pool.write().await;
167
168            // There's a (small) chance the page was fetched & buffered by another task before we
169            // were able to acquire the write lock, so check the cache before doing anything else.
170            let count = pool.read_at(page_size, blob_id, buf, offset);
171            if count != 0 {
172                return Ok(count);
173            }
174
175            let entry = pool.page_fetches.entry((blob_id, page_num));
176            match entry {
177                Entry::Occupied(o) => {
178                    // Another thread is already fetching this page, so clone its existing future.
179                    (o.get().clone(), false)
180                }
181                Entry::Vacant(v) => {
182                    // Nobody is currently fetching this page, so create a future that will do the work.
183                    let blob = blob.clone();
184                    let future = async move {
185                        blob.read_at(vec![0; page_size], page_num * page_size as u64)
186                            .await
187                            .map_err(Arc::new)
188                    };
189
190                    // Make the future shareable and insert it into the map.
191                    let shareable = future.boxed().shared();
192                    v.insert(shareable.clone());
193
194                    (shareable, true)
195                }
196            }
197        };
198
199        // Await the future and get the page buffer. If this isn't the task that initiated the
200        // fetch, we can return immediately with the result. Note that we cannot return immediately
201        // on error, since we'd bypass the cleanup required of the first fetcher.
202        let fetch_result = fetch_future.await;
203        if !is_first_fetcher {
204            // Copy the requested portion of the page into the buffer and return immediately.
205            let page_buf: Vec<u8> = fetch_result.map_err(|_| Error::ReadFailed)?.into();
206            let bytes_to_copy = std::cmp::min(buf.len(), page_size - offset_in_page);
207            buf[..bytes_to_copy]
208                .copy_from_slice(&page_buf[offset_in_page..offset_in_page + bytes_to_copy]);
209            return Ok(bytes_to_copy);
210        }
211
212        // This is the task that initiated the fetch, so it is responsible for cleaning up the
213        // inserted entry, and caching the page in the buffer pool if the fetch didn't error out.
214        // This requires a write lock on the buffer pool to modify `page_fetches` and cache the
215        // page.
216        let mut pool = self.pool.write().await;
217
218        // Remove the entry from `page_fetches`.
219        let _ = pool.page_fetches.remove(&(blob_id, page_num));
220
221        // Cache the result in the buffer pool.
222        let Ok(page_buf) = fetch_result else {
223            return Err(Error::ReadFailed);
224        };
225        pool.cache(page_size, blob_id, page_buf.as_ref(), page_num);
226
227        // Copy the requested portion of the page into the buffer.
228        let page_buf: Vec<u8> = page_buf.into();
229        let bytes_to_copy = std::cmp::min(buf.len(), page_size - offset_in_page);
230        buf[..bytes_to_copy]
231            .copy_from_slice(&page_buf[offset_in_page..offset_in_page + bytes_to_copy]);
232
233        Ok(bytes_to_copy)
234    }
235
236    /// Cache the provided slice of data in the buffer pool, returning the remaining bytes that
237    /// didn't fill a whole page. `offset` must be page aligned.
238    ///
239    /// If the next page index would overflow `u64`, caching stops and the uncached bytes are
240    /// returned. This can only occur with 1-byte pages on 64-bit architectures. On 32-bit
241    /// architectures it cannot occur because the buffer length is bounded by `usize::MAX` (2^32-1),
242    /// so even starting at page `u64::MAX` with 1-byte pages, at most 2^32-1 pages can be cached.
243    /// On 64-bit architectures with page_size >= 2, the maximum starting page (`u64::MAX / 2`)
244    /// plus maximum cacheable pages (`usize::MAX / 2`) equals `u64::MAX - 1`.
245    ///
246    /// # Panics
247    ///
248    /// Panics if `offset` is not page aligned.
249    pub async fn cache(&self, blob_id: u64, mut buf: &[u8], offset: u64) -> usize {
250        let (mut page_num, offset_in_page) = self.offset_to_page(offset);
251        assert_eq!(offset_in_page, 0);
252        {
253            // Write lock the buffer pool.
254            let mut buffer_pool = self.pool.write().await;
255            while buf.len() >= self.page_size {
256                buffer_pool.cache(self.page_size, blob_id, &buf[..self.page_size], page_num);
257                buf = &buf[self.page_size..];
258                page_num = match page_num.checked_add(1) {
259                    Some(next) => next,
260                    None => break,
261                };
262            }
263        }
264
265        buf.len()
266    }
267}
268
269impl Pool {
270    /// Return a new empty buffer pool with an initial next-blob id of 0, and a max cache capacity
271    /// of `capacity` pages.
272    ///
273    /// # Panics
274    ///
275    /// Panics if `capacity` is 0.
276    pub fn new(capacity: usize) -> Self {
277        assert!(capacity > 0);
278        Self {
279            index: HashMap::new(),
280            cache: Vec::new(),
281            clock: 0,
282            capacity,
283            page_fetches: HashMap::new(),
284        }
285    }
286
287    /// Convert an offset into the number of the page it belongs to and the offset within that page.
288    const fn offset_to_page(page_size: usize, offset: u64) -> (u64, usize) {
289        (
290            offset / page_size as u64,
291            (offset % page_size as u64) as usize,
292        )
293    }
294
295    /// Attempt to fetch blob data starting at `offset` from the buffer pool. Returns the number of
296    /// bytes read, which could be 0 if the first page in the requested range isn't buffered, and is
297    /// never more than `self.page_size` or the length of `buf`. The returned bytes won't cross a
298    /// page boundary, so multiple reads may be required even if all data in the desired range is
299    /// buffered.
300    fn read_at(&self, page_size: usize, blob_id: u64, buf: &mut [u8], offset: u64) -> usize {
301        let (page_num, offset_in_page) = Self::offset_to_page(page_size, offset);
302        let page_index = self.index.get(&(blob_id, page_num));
303        let Some(&page_index) = page_index else {
304            return 0;
305        };
306        let page = &self.cache[page_index];
307        assert_eq!(page.key, (blob_id, page_num));
308        page.referenced.store(true, Ordering::Relaxed);
309        let page = &page.data;
310
311        let bytes_to_copy = std::cmp::min(buf.len(), page_size - offset_in_page);
312        buf[..bytes_to_copy].copy_from_slice(&page[offset_in_page..offset_in_page + bytes_to_copy]);
313
314        bytes_to_copy
315    }
316
317    /// Put the given `page` into the buffer pool.
318    ///
319    /// # Panics
320    ///
321    /// Panics if the provided page is not exactly PAGE_SIZE bytes long.
322    fn cache(&mut self, page_size: usize, blob_id: u64, page: &[u8], page_num: u64) {
323        assert_eq!(page.len(), page_size);
324
325        let key = (blob_id, page_num);
326        let index_entry = self.index.entry(key);
327        if let Entry::Occupied(index_entry) = index_entry {
328            // This case can result when a blob is truncated across a page boundary, and later grows
329            // back to (beyond) its original size. It will also become expected behavior once we
330            // allow cached pages to be writable.
331            debug!(blob_id, page_num, "updating duplicate page");
332
333            // Update the stale data with the new page.
334            let entry = &mut self.cache[*index_entry.get()];
335            assert_eq!(entry.key, key);
336            entry.referenced.store(true, Ordering::Relaxed);
337            entry.data.copy_from_slice(page);
338            return;
339        }
340
341        if self.cache.len() < self.capacity {
342            self.index.insert(key, self.cache.len());
343            self.cache.push(CacheEntry {
344                key,
345                referenced: AtomicBool::new(true),
346                data: page.into(),
347            });
348            return;
349        }
350
351        // Cache is full, find a page to evict.
352        while self.cache[self.clock].referenced.load(Ordering::Relaxed) {
353            self.cache[self.clock]
354                .referenced
355                .store(false, Ordering::Relaxed);
356            self.clock = (self.clock + 1) % self.cache.len();
357        }
358
359        // Evict the page by replacing it with the new page.
360        let entry = &mut self.cache[self.clock];
361        entry.referenced.store(true, Ordering::Relaxed);
362        assert!(self.index.remove(&entry.key).is_some());
363        self.index.insert(key, self.clock);
364        entry.key = key;
365        entry.data.copy_from_slice(page);
366
367        // Move the clock forward.
368        self.clock = (self.clock + 1) % self.cache.len();
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use crate::{deterministic, Runner as _, Storage as _};
376    use commonware_macros::test_traced;
377    use commonware_utils::NZUsize;
378
379    const PAGE_SIZE: usize = 1024;
380
381    #[test_traced]
382    fn test_pool_basic() {
383        let mut pool: Pool = Pool::new(10);
384
385        let mut buf = vec![0; PAGE_SIZE];
386        let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, 0);
387        assert_eq!(bytes_read, 0);
388
389        pool.cache(PAGE_SIZE, 0, &[1; PAGE_SIZE], 0);
390        let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, 0);
391        assert_eq!(bytes_read, PAGE_SIZE);
392        assert_eq!(buf, [1; PAGE_SIZE]);
393
394        // Test replacement -- should log a duplicate page warning but still work.
395        pool.cache(PAGE_SIZE, 0, &[2; PAGE_SIZE], 0);
396        let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, 0);
397        assert_eq!(bytes_read, PAGE_SIZE);
398        assert_eq!(buf, [2; PAGE_SIZE]);
399
400        // Test exceeding the cache capacity.
401        for i in 0u64..11 {
402            pool.cache(PAGE_SIZE, 0, &[i as u8; PAGE_SIZE], i);
403        }
404        // Page 0 should have been evicted.
405        let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, 0);
406        assert_eq!(bytes_read, 0);
407        // Page 1-10 should be in the cache.
408        for i in 1u64..11 {
409            let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, i * PAGE_SIZE as u64);
410            assert_eq!(bytes_read, PAGE_SIZE);
411            assert_eq!(buf, [i as u8; PAGE_SIZE]);
412        }
413
414        // Test reading from an unaligned offset by adding 2 to an aligned offset. The read
415        // should be 2 bytes short of a full page.
416        let mut buf = vec![0; PAGE_SIZE];
417        let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, PAGE_SIZE as u64 + 2);
418        assert_eq!(bytes_read, PAGE_SIZE - 2);
419        assert_eq!(&buf[..PAGE_SIZE - 2], [1; PAGE_SIZE - 2]);
420    }
421
422    #[test_traced]
423    fn test_pool_read_with_blob() {
424        // Initialize the deterministic context
425        let executor = deterministic::Runner::default();
426        // Start the test within the executor
427        executor.start(|context| async move {
428            // Populate a blob with 11 consecutive pages of data.
429            let (blob, size) = context
430                .open("test", "blob".as_bytes())
431                .await
432                .expect("Failed to open blob");
433            assert_eq!(size, 0);
434            for i in 0..11 {
435                let buf = vec![i as u8; PAGE_SIZE];
436                blob.write_at(buf, i * PAGE_SIZE as u64).await.unwrap();
437            }
438
439            // Fill the buffer pool with the blob's data.
440            let pool_ref = PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(10));
441            assert_eq!(pool_ref.next_id().await, 0);
442            assert_eq!(pool_ref.next_id().await, 1);
443            for i in 0..11 {
444                let mut buf = vec![0; PAGE_SIZE];
445                pool_ref
446                    .read(&blob, 0, &mut buf, i * PAGE_SIZE as u64)
447                    .await
448                    .unwrap();
449                assert_eq!(buf, [i as u8; PAGE_SIZE]);
450            }
451
452            // Repeat the read to exercise reading from the buffer pool. Must start at 1 because
453            // page 0 should be evicted.
454            for i in 1..11 {
455                let mut buf = vec![0; PAGE_SIZE];
456                pool_ref
457                    .read(&blob, 0, &mut buf, i * PAGE_SIZE as u64)
458                    .await
459                    .unwrap();
460                assert_eq!(buf, [i as u8; PAGE_SIZE]);
461            }
462
463            // Cleanup.
464            blob.sync().await.unwrap();
465        });
466    }
467
468    #[test_traced]
469    fn test_pool_cache_max_page() {
470        let executor = deterministic::Runner::default();
471        executor.start(|_context| async move {
472            let pool_ref = PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(2));
473
474            // Use the largest page-aligned offset representable for the configured PAGE_SIZE.
475            let aligned_max_offset = u64::MAX - (u64::MAX % PAGE_SIZE as u64);
476
477            // Caching exactly one page at the maximum offset should succeed.
478            let remaining = pool_ref
479                .cache(0, vec![42; PAGE_SIZE].as_slice(), aligned_max_offset)
480                .await;
481            assert_eq!(remaining, 0);
482
483            let mut buf = vec![0u8; PAGE_SIZE];
484            let pool = pool_ref.pool.read().await;
485            let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, aligned_max_offset);
486            assert_eq!(bytes_read, PAGE_SIZE);
487            assert!(buf.iter().all(|b| *b == 42));
488        });
489    }
490
491    #[test_traced]
492    fn test_pool_cache_page_overflow_partial() {
493        let executor = deterministic::Runner::default();
494        executor.start(|_context| async move {
495            // Use the minimum page size to force the page index to reach u64::MAX and trigger the
496            // overflow guard.
497            let pool_ref = PoolRef::new(NZUsize!(1), NZUsize!(2));
498
499            // Caching across the maximum page should stop before overflow and report the remainder.
500            let remaining = pool_ref.cache(0, &[1, 2], u64::MAX).await;
501            assert_eq!(remaining, 1);
502
503            let mut buf = [0u8; 1];
504            let pool = pool_ref.pool.read().await;
505            assert_eq!(pool.read_at(1, 0, &mut buf, u64::MAX), 1);
506            assert_eq!(buf, [1]);
507        });
508    }
509}