commonware_runtime/utils/buffer/
pool.rs

1use crate::{Blob, Error, RwLock};
2use commonware_utils::StableBuf;
3use futures::{future::Shared, FutureExt};
4use std::{
5    collections::{hash_map::Entry, HashMap},
6    future::Future,
7    pin::Pin,
8    sync::{
9        atomic::{AtomicBool, Ordering},
10        Arc,
11    },
12};
13use tracing::{debug, trace};
14
15// Type alias for the future we'll be storing for each in-flight page fetch.
16//
17// We wrap [Error] in an Arc so it will be cloneable, which is required for the future to be
18// [Shared].
19type PageFetchFut = Shared<Pin<Box<dyn Future<Output = Result<StableBuf, Arc<Error>>> + Send>>>;
20
21/// A [Pool] caches pages of [Blob] data in memory.
22///
23/// A single buffer pool can be used to cache data from multiple blobs by assigning a unique id to
24/// each.
25///
26/// Implements the [Clock](https://en.wikipedia.org/wiki/Page_replacement_algorithm#Clock)
27/// replacement policy, which is a lightweight approximation of LRU. The page `cache` is a circular
28/// list of recently accessed pages, and `clock` is the index of the next page within it to examine
29/// for replacement. When a page needs to be evicted, we start the search at `clock` within `cache`,
30/// searching for the first page with a false reference bit, and setting any skipped page's
31/// reference bit to false along the way.
32pub struct Pool {
33    /// The page cache index, with a key composed of (blob id, page number), that maps each cached
34    /// page to the index of its `cache` entry.
35    ///
36    /// # Invariants
37    ///
38    /// Each `index` entry maps to exactly one `cache` entry, and that cache entry always has a
39    /// matching key.
40    index: HashMap<(u64, u64), usize>,
41
42    /// The page cache.
43    ///
44    /// Each `cache` entry has exactly one corresponding `index` entry.
45    cache: Vec<CacheEntry>,
46
47    /// The Clock replacement policy's clock hand index into `cache`.
48    clock: usize,
49
50    /// The next id to assign to a blob that will be managed by this pool.
51    next_id: u64,
52
53    /// The maximum number of pages that will be cached.
54    capacity: usize,
55
56    /// A map of currently executing page fetches to ensure only one task at a time is trying to
57    /// fetch a specific page.
58    page_fetches: HashMap<(u64, u64), PageFetchFut>,
59}
60
61struct CacheEntry {
62    /// The cache key which is composed of the blob id and page number of the page.
63    key: (u64, u64),
64
65    /// A bit indicating whether this page was recently referenced.
66    referenced: AtomicBool,
67
68    /// The cached page itself.
69    data: Vec<u8>,
70}
71
72/// A reference to a [Pool] that can be shared across threads via cloning, along with the page size
73/// that will be used with it. Provides the API for interacting with the buffer pool in a
74/// thread-safe manner.
75#[derive(Clone)]
76pub struct PoolRef {
77    /// The size of each page in the buffer pool.
78    pub(super) page_size: usize,
79
80    /// Shareable reference to the buffer pool.
81    pool: Arc<RwLock<Pool>>,
82}
83
84impl PoolRef {
85    /// Returns a new [PoolRef] with the given `page_size` and `capacity`.
86    pub fn new(page_size: usize, capacity: usize) -> Self {
87        Self {
88            page_size,
89            pool: Arc::new(RwLock::new(Pool::new(capacity))),
90        }
91    }
92
93    /// Returns a unique id for the next blob that will use this buffer pool.
94    pub async fn next_id(&self) -> u64 {
95        let mut pool = self.pool.write().await;
96        pool.next_id()
97    }
98
99    /// Convert an offset into the number of the page it belongs to and the offset within that page.
100    pub fn offset_to_page(&self, offset: u64) -> (u64, usize) {
101        Pool::offset_to_page(self.page_size, offset)
102    }
103
104    /// Read the specified bytes, preferentially from the buffer pool cache. Bytes not found in the
105    /// buffer pool will be read from the provided `blob` and cached for future reads.
106    ///
107    /// # Warning
108    ///
109    /// Attempts to read any of the last (blob_size % page_size) "trailing bytes" of the blob will
110    /// result in a ReadFailed error since the buffer pool only deals with page sized chunks.
111    /// Trailing bytes need to be dealt with outside of the buffer pool. For example,
112    /// [crate::buffer::Append] uses a [crate::buffer::tip::Buffer] to buffer them.
113    pub(super) async fn read<B: Blob>(
114        &self,
115        blob: &B,
116        blob_id: u64,
117        mut buf: &mut [u8],
118        mut offset: u64,
119    ) -> Result<(), Error> {
120        // Read up to a page worth of data at a time from either the buffer pool or the `blob`,
121        // until the requested data is fully read.
122        while !buf.is_empty() {
123            // Read lock the buffer pool and see if we can get (some of) the data from it.
124            {
125                let buffer_pool = self.pool.read().await;
126                let count = buffer_pool.read_at(self.page_size, blob_id, buf, offset);
127                if count != 0 {
128                    offset += count as u64;
129                    buf = &mut buf[count..];
130                    continue;
131                }
132            }
133
134            // Handle page fault.
135            let count = self
136                .read_after_page_fault(blob, blob_id, buf, offset)
137                .await?;
138            offset += count as u64;
139            buf = &mut buf[count..];
140        }
141
142        Ok(())
143    }
144
145    /// Fetch the specified page after encountering a page fault, which may involve retrieving it
146    /// from `blob` & caching the result in `pool`. Returns the number of bytes read, which should
147    /// always be non-zero.
148    async fn read_after_page_fault<B: Blob>(
149        &self,
150        blob: &B,
151        blob_id: u64,
152        buf: &mut [u8],
153        offset: u64,
154    ) -> Result<usize, Error> {
155        assert!(!buf.is_empty());
156
157        let (page_num, offset_in_page) = Pool::offset_to_page(self.page_size, offset);
158        let page_size = self.page_size;
159        trace!(page_num, blob_id, "page fault");
160
161        // Create or clone a future that retrieves the desired page from the underlying blob. This
162        // requires a write lock on the buffer pool since we may need to modify `page_fetches` if
163        // this is the first fetcher.
164        let (fetch_future, is_first_fetcher) = {
165            let mut pool = self.pool.write().await;
166
167            // There's a (small) chance the page was fetched & buffered by another task before we
168            // were able to acquire the write lock, so check the cache before doing anything else.
169            let count = pool.read_at(page_size, blob_id, buf, offset);
170            if count != 0 {
171                return Ok(count);
172            }
173
174            let entry = pool.page_fetches.entry((blob_id, page_num));
175            match entry {
176                Entry::Occupied(o) => {
177                    // Another thread is already fetching this page, so clone its existing future.
178                    (o.get().clone(), false)
179                }
180                Entry::Vacant(v) => {
181                    // Nobody is currently fetching this page, so create a future that will do the work.
182                    let blob = blob.clone();
183                    let future = async move {
184                        blob.read_at(vec![0; page_size], page_num * page_size as u64)
185                            .await
186                            .map_err(Arc::new)
187                    };
188
189                    // Make the future shareable and insert it into the map.
190                    let shareable = future.boxed().shared();
191                    v.insert(shareable.clone());
192
193                    (shareable, true)
194                }
195            }
196        };
197
198        // Await the future and get the page buffer. If this isn't the task that initiated the
199        // fetch, we can return immediately with the result. Note that we cannot return immediately
200        // on error, since we'd bypass the cleanup required of the first fetcher.
201        let fetch_result = fetch_future.await;
202        if !is_first_fetcher {
203            // Copy the requested portion of the page into the buffer and return immediately.
204            let page_buf: Vec<u8> = fetch_result.map_err(|_| Error::ReadFailed)?.into();
205            let bytes_to_copy = std::cmp::min(buf.len(), page_size - offset_in_page);
206            buf[..bytes_to_copy]
207                .copy_from_slice(&page_buf[offset_in_page..offset_in_page + bytes_to_copy]);
208            return Ok(bytes_to_copy);
209        }
210
211        // This is the task that initiated the fetch, so it is responsible for cleaning up the
212        // inserted entry, and caching the page in the buffer pool if the fetch didn't error out.
213        // This requires a write lock on the buffer pool to modify `page_fetches` and cache the
214        // page.
215        let mut pool = self.pool.write().await;
216
217        // Remove the entry from `page_fetches`.
218        let _ = pool.page_fetches.remove(&(blob_id, page_num));
219
220        // Cache the result in the buffer pool.
221        let Ok(page_buf) = fetch_result else {
222            return Err(Error::ReadFailed);
223        };
224        pool.cache(page_size, blob_id, page_buf.as_ref(), page_num);
225
226        // Copy the requested portion of the page into the buffer.
227        let page_buf: Vec<u8> = page_buf.into();
228        let bytes_to_copy = std::cmp::min(buf.len(), page_size - offset_in_page);
229        buf[..bytes_to_copy]
230            .copy_from_slice(&page_buf[offset_in_page..offset_in_page + bytes_to_copy]);
231
232        Ok(bytes_to_copy)
233    }
234
235    /// Cache the provided slice of data in the buffer pool, returning the remaining bytes that
236    /// didn't fill a whole page. `offset` must be page aligned.
237    ///
238    /// # Panics
239    ///
240    /// Panics if `offset` is not page aligned.
241    pub async fn cache(&self, blob_id: u64, mut buf: &[u8], offset: u64) -> usize {
242        let (mut page_num, offset_in_page) = self.offset_to_page(offset);
243        assert_eq!(offset_in_page, 0);
244        {
245            // Write lock the buffer pool.
246            let mut buffer_pool = self.pool.write().await;
247            while buf.len() >= self.page_size {
248                buffer_pool.cache(self.page_size, blob_id, &buf[..self.page_size], page_num);
249                buf = &buf[self.page_size..];
250                page_num += 1;
251            }
252        }
253
254        buf.len()
255    }
256}
257
258impl Pool {
259    /// Return a new empty buffer pool with an initial next-blob id of 0, and a max cache capacity
260    /// of `capacity` pages.
261    ///
262    /// # Panics
263    ///
264    /// Panics if `capacity` is 0.
265    pub fn new(capacity: usize) -> Self {
266        assert!(capacity > 0);
267        Self {
268            index: HashMap::new(),
269            cache: Vec::new(),
270            clock: 0,
271            next_id: 0,
272            capacity,
273            page_fetches: HashMap::new(),
274        }
275    }
276
277    /// Assign and return the next unique blob id.
278    pub(super) fn next_id(&mut self) -> u64 {
279        let id = self.next_id;
280        self.next_id += 1;
281        id
282    }
283
284    /// Convert an offset into the number of the page it belongs to and the offset within that page.
285    fn offset_to_page(page_size: usize, offset: u64) -> (u64, usize) {
286        (
287            offset / page_size as u64,
288            (offset % page_size as u64) as usize,
289        )
290    }
291
292    /// Attempt to fetch blob data starting at `offset` from the buffer pool. Returns the number of
293    /// bytes read, which could be 0 if the first page in the requested range isn't buffered, and is
294    /// never more than `self.page_size` or the length of `buf`. The returned bytes won't cross a
295    /// page boundary, so multiple reads may be required even if all data in the desired range is
296    /// buffered.
297    fn read_at(&self, page_size: usize, blob_id: u64, buf: &mut [u8], offset: u64) -> usize {
298        let (page_num, offset_in_page) = Self::offset_to_page(page_size, offset);
299        let page_index = self.index.get(&(blob_id, page_num));
300        let Some(&page_index) = page_index else {
301            return 0;
302        };
303        let page = &self.cache[page_index];
304        assert_eq!(page.key, (blob_id, page_num));
305        page.referenced.store(true, Ordering::Relaxed);
306        let page = &page.data;
307
308        let bytes_to_copy = std::cmp::min(buf.len(), page_size - offset_in_page);
309        buf[..bytes_to_copy].copy_from_slice(&page[offset_in_page..offset_in_page + bytes_to_copy]);
310
311        bytes_to_copy
312    }
313
314    /// Put the given `page` into the buffer pool.
315    ///
316    /// # Panics
317    ///
318    /// Panics if the provided page is not exactly PAGE_SIZE bytes long.
319    fn cache(&mut self, page_size: usize, blob_id: u64, page: &[u8], page_num: u64) {
320        assert_eq!(page.len(), page_size);
321
322        let key = (blob_id, page_num);
323        let index_entry = self.index.entry(key);
324        if let Entry::Occupied(index_entry) = index_entry {
325            // This case can result when a blob is truncated across a page boundary, and later grows
326            // back to (beyond) its original size. It will also become expected behavior once we
327            // allow cached pages to be writable.
328            debug!(blob_id, page_num, "updating duplicate page");
329
330            // Update the stale data with the new page.
331            let entry = &mut self.cache[*index_entry.get()];
332            assert_eq!(entry.key, key);
333            entry.referenced.store(true, Ordering::Relaxed);
334            entry.data.copy_from_slice(page);
335            return;
336        }
337
338        if self.cache.len() < self.capacity {
339            self.index.insert(key, self.cache.len());
340            self.cache.push(CacheEntry {
341                key,
342                referenced: AtomicBool::new(true),
343                data: page.into(),
344            });
345            return;
346        }
347
348        // Cache is full, find a page to evict.
349        while self.cache[self.clock].referenced.load(Ordering::Relaxed) {
350            self.cache[self.clock]
351                .referenced
352                .store(false, Ordering::Relaxed);
353            self.clock = (self.clock + 1) % self.cache.len();
354        }
355
356        // Evict the page by replacing it with the new page.
357        let entry = &mut self.cache[self.clock];
358        entry.referenced.store(true, Ordering::Relaxed);
359        assert!(self.index.remove(&entry.key).is_some());
360        self.index.insert(key, self.clock);
361        entry.key = key;
362        entry.data.copy_from_slice(page);
363
364        // Move the clock forward.
365        self.clock = (self.clock + 1) % self.cache.len();
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372    use crate::{deterministic, Runner as _, Storage as _};
373    use commonware_macros::test_traced;
374
375    const PAGE_SIZE: usize = 1024;
376
377    #[test_traced]
378    fn test_pool_basic() {
379        let mut pool: Pool = Pool::new(10);
380        assert_eq!(pool.next_id(), 0);
381        assert_eq!(pool.next_id(), 1);
382
383        let mut buf = vec![0; PAGE_SIZE];
384        let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, 0);
385        assert_eq!(bytes_read, 0);
386
387        pool.cache(PAGE_SIZE, 0, &[1; PAGE_SIZE], 0);
388        let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, 0);
389        assert_eq!(bytes_read, PAGE_SIZE);
390        assert_eq!(buf, [1; PAGE_SIZE]);
391
392        // Test replacement -- should log a duplicate page warning but still work.
393        pool.cache(PAGE_SIZE, 0, &[2; PAGE_SIZE], 0);
394        let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, 0);
395        assert_eq!(bytes_read, PAGE_SIZE);
396        assert_eq!(buf, [2; PAGE_SIZE]);
397
398        // Test exceeding the cache capacity.
399        for i in 0u64..11 {
400            pool.cache(PAGE_SIZE, 0, &[i as u8; PAGE_SIZE], i);
401        }
402        // Page 0 should have been evicted.
403        let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, 0);
404        assert_eq!(bytes_read, 0);
405        // Page 1-10 should be in the cache.
406        for i in 1u64..11 {
407            let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, i * PAGE_SIZE as u64);
408            assert_eq!(bytes_read, PAGE_SIZE);
409            assert_eq!(buf, [i as u8; PAGE_SIZE]);
410        }
411
412        // Test reading from an unaligned offset by adding 2 to an aligned offset. The read
413        // should be 2 bytes short of a full page.
414        let mut buf = vec![0; PAGE_SIZE];
415        let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, PAGE_SIZE as u64 + 2);
416        assert_eq!(bytes_read, PAGE_SIZE - 2);
417        assert_eq!(&buf[..PAGE_SIZE - 2], [1; PAGE_SIZE - 2]);
418    }
419
420    #[test_traced]
421    fn test_pool_read_with_blob() {
422        // Initialize the deterministic context
423        let executor = deterministic::Runner::default();
424        // Start the test within the executor
425        executor.start(|context| async move {
426            // Populate a blob with 11 consecutive pages of data.
427            let (blob, size) = context
428                .open("test", "blob".as_bytes())
429                .await
430                .expect("Failed to open blob");
431            assert_eq!(size, 0);
432            for i in 0..11 {
433                let buf = vec![i as u8; PAGE_SIZE];
434                blob.write_at(buf, i * PAGE_SIZE as u64).await.unwrap();
435            }
436
437            // Fill the buffer pool with the blob's data.
438            let pool_ref = PoolRef::new(PAGE_SIZE, 10);
439            for i in 0..11 {
440                let mut buf = vec![0; PAGE_SIZE];
441                pool_ref
442                    .read(&blob, 0, &mut buf, i * PAGE_SIZE as u64)
443                    .await
444                    .unwrap();
445                assert_eq!(buf, [i as u8; PAGE_SIZE]);
446            }
447
448            // Repeat the read to exercise reading from the buffer pool. Must start at 1 because
449            // page 0 should be evicted.
450            for i in 1..11 {
451                let mut buf = vec![0; PAGE_SIZE];
452                pool_ref
453                    .read(&blob, 0, &mut buf, i * PAGE_SIZE as u64)
454                    .await
455                    .unwrap();
456                assert_eq!(buf, [i as u8; PAGE_SIZE]);
457            }
458
459            // Cleanup.
460            blob.close().await.expect("Failed to destroy blob");
461        });
462    }
463}