commonware_runtime/utils/buffer/pool.rs
1use crate::{Blob, Error, RwLock};
2use commonware_utils::StableBuf;
3use futures::{future::Shared, FutureExt};
4use std::{
5 collections::{hash_map::Entry, HashMap},
6 future::Future,
7 num::NonZeroUsize,
8 pin::Pin,
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc,
12 },
13};
14use tracing::{debug, trace};
15
16// Type alias for the future we'll be storing for each in-flight page fetch.
17//
18// We wrap [Error] in an Arc so it will be cloneable, which is required for the future to be
19// [Shared].
20type PageFetchFut = Shared<Pin<Box<dyn Future<Output = Result<StableBuf, Arc<Error>>> + Send>>>;
21
22/// A [Pool] caches pages of [Blob] data in memory.
23///
24/// A single buffer pool can be used to cache data from multiple blobs by assigning a unique id to
25/// each.
26///
27/// Implements the [Clock](https://en.wikipedia.org/wiki/Page_replacement_algorithm#Clock)
28/// replacement policy, which is a lightweight approximation of LRU. The page `cache` is a circular
29/// list of recently accessed pages, and `clock` is the index of the next page within it to examine
30/// for replacement. When a page needs to be evicted, we start the search at `clock` within `cache`,
31/// searching for the first page with a false reference bit, and setting any skipped page's
32/// reference bit to false along the way.
33pub struct Pool {
34 /// The page cache index, with a key composed of (blob id, page number), that maps each cached
35 /// page to the index of its `cache` entry.
36 ///
37 /// # Invariants
38 ///
39 /// Each `index` entry maps to exactly one `cache` entry, and that cache entry always has a
40 /// matching key.
41 index: HashMap<(u64, u64), usize>,
42
43 /// The page cache.
44 ///
45 /// Each `cache` entry has exactly one corresponding `index` entry.
46 cache: Vec<CacheEntry>,
47
48 /// The Clock replacement policy's clock hand index into `cache`.
49 clock: usize,
50
51 /// The next id to assign to a blob that will be managed by this pool.
52 next_id: u64,
53
54 /// The maximum number of pages that will be cached.
55 capacity: usize,
56
57 /// A map of currently executing page fetches to ensure only one task at a time is trying to
58 /// fetch a specific page.
59 page_fetches: HashMap<(u64, u64), PageFetchFut>,
60}
61
62struct CacheEntry {
63 /// The cache key which is composed of the blob id and page number of the page.
64 key: (u64, u64),
65
66 /// A bit indicating whether this page was recently referenced.
67 referenced: AtomicBool,
68
69 /// The cached page itself.
70 data: Vec<u8>,
71}
72
73/// A reference to a [Pool] that can be shared across threads via cloning, along with the page size
74/// that will be used with it. Provides the API for interacting with the buffer pool in a
75/// thread-safe manner.
76#[derive(Clone)]
77pub struct PoolRef {
78 /// The size of each page in the buffer pool.
79 pub(super) page_size: usize,
80
81 /// Shareable reference to the buffer pool.
82 pool: Arc<RwLock<Pool>>,
83}
84
85impl PoolRef {
86 /// Returns a new [PoolRef] with the given `page_size` and `capacity`.
87 pub fn new(page_size: NonZeroUsize, capacity: NonZeroUsize) -> Self {
88 Self {
89 page_size: page_size.get(),
90 pool: Arc::new(RwLock::new(Pool::new(capacity.get()))),
91 }
92 }
93
94 /// Returns a unique id for the next blob that will use this buffer pool.
95 pub async fn next_id(&self) -> u64 {
96 let mut pool = self.pool.write().await;
97 pool.next_id()
98 }
99
100 /// Convert an offset into the number of the page it belongs to and the offset within that page.
101 pub fn offset_to_page(&self, offset: u64) -> (u64, usize) {
102 Pool::offset_to_page(self.page_size, offset)
103 }
104
105 /// Read the specified bytes, preferentially from the buffer pool cache. Bytes not found in the
106 /// buffer pool will be read from the provided `blob` and cached for future reads.
107 ///
108 /// # Warning
109 ///
110 /// Attempts to read any of the last (blob_size % page_size) "trailing bytes" of the blob will
111 /// result in a ReadFailed error since the buffer pool only deals with page sized chunks.
112 /// Trailing bytes need to be dealt with outside of the buffer pool. For example,
113 /// [crate::buffer::Append] uses a [crate::buffer::tip::Buffer] to buffer them.
114 pub(super) async fn read<B: Blob>(
115 &self,
116 blob: &B,
117 blob_id: u64,
118 mut buf: &mut [u8],
119 mut offset: u64,
120 ) -> Result<(), Error> {
121 // Read up to a page worth of data at a time from either the buffer pool or the `blob`,
122 // until the requested data is fully read.
123 while !buf.is_empty() {
124 // Read lock the buffer pool and see if we can get (some of) the data from it.
125 {
126 let buffer_pool = self.pool.read().await;
127 let count = buffer_pool.read_at(self.page_size, blob_id, buf, offset);
128 if count != 0 {
129 offset += count as u64;
130 buf = &mut buf[count..];
131 continue;
132 }
133 }
134
135 // Handle page fault.
136 let count = self
137 .read_after_page_fault(blob, blob_id, buf, offset)
138 .await?;
139 offset += count as u64;
140 buf = &mut buf[count..];
141 }
142
143 Ok(())
144 }
145
146 /// Fetch the specified page after encountering a page fault, which may involve retrieving it
147 /// from `blob` & caching the result in `pool`. Returns the number of bytes read, which should
148 /// always be non-zero.
149 async fn read_after_page_fault<B: Blob>(
150 &self,
151 blob: &B,
152 blob_id: u64,
153 buf: &mut [u8],
154 offset: u64,
155 ) -> Result<usize, Error> {
156 assert!(!buf.is_empty());
157
158 let (page_num, offset_in_page) = Pool::offset_to_page(self.page_size, offset);
159 let page_size = self.page_size;
160 trace!(page_num, blob_id, "page fault");
161
162 // Create or clone a future that retrieves the desired page from the underlying blob. This
163 // requires a write lock on the buffer pool since we may need to modify `page_fetches` if
164 // this is the first fetcher.
165 let (fetch_future, is_first_fetcher) = {
166 let mut pool = self.pool.write().await;
167
168 // There's a (small) chance the page was fetched & buffered by another task before we
169 // were able to acquire the write lock, so check the cache before doing anything else.
170 let count = pool.read_at(page_size, blob_id, buf, offset);
171 if count != 0 {
172 return Ok(count);
173 }
174
175 let entry = pool.page_fetches.entry((blob_id, page_num));
176 match entry {
177 Entry::Occupied(o) => {
178 // Another thread is already fetching this page, so clone its existing future.
179 (o.get().clone(), false)
180 }
181 Entry::Vacant(v) => {
182 // Nobody is currently fetching this page, so create a future that will do the work.
183 let blob = blob.clone();
184 let future = async move {
185 blob.read_at(vec![0; page_size], page_num * page_size as u64)
186 .await
187 .map_err(Arc::new)
188 };
189
190 // Make the future shareable and insert it into the map.
191 let shareable = future.boxed().shared();
192 v.insert(shareable.clone());
193
194 (shareable, true)
195 }
196 }
197 };
198
199 // Await the future and get the page buffer. If this isn't the task that initiated the
200 // fetch, we can return immediately with the result. Note that we cannot return immediately
201 // on error, since we'd bypass the cleanup required of the first fetcher.
202 let fetch_result = fetch_future.await;
203 if !is_first_fetcher {
204 // Copy the requested portion of the page into the buffer and return immediately.
205 let page_buf: Vec<u8> = fetch_result.map_err(|_| Error::ReadFailed)?.into();
206 let bytes_to_copy = std::cmp::min(buf.len(), page_size - offset_in_page);
207 buf[..bytes_to_copy]
208 .copy_from_slice(&page_buf[offset_in_page..offset_in_page + bytes_to_copy]);
209 return Ok(bytes_to_copy);
210 }
211
212 // This is the task that initiated the fetch, so it is responsible for cleaning up the
213 // inserted entry, and caching the page in the buffer pool if the fetch didn't error out.
214 // This requires a write lock on the buffer pool to modify `page_fetches` and cache the
215 // page.
216 let mut pool = self.pool.write().await;
217
218 // Remove the entry from `page_fetches`.
219 let _ = pool.page_fetches.remove(&(blob_id, page_num));
220
221 // Cache the result in the buffer pool.
222 let Ok(page_buf) = fetch_result else {
223 return Err(Error::ReadFailed);
224 };
225 pool.cache(page_size, blob_id, page_buf.as_ref(), page_num);
226
227 // Copy the requested portion of the page into the buffer.
228 let page_buf: Vec<u8> = page_buf.into();
229 let bytes_to_copy = std::cmp::min(buf.len(), page_size - offset_in_page);
230 buf[..bytes_to_copy]
231 .copy_from_slice(&page_buf[offset_in_page..offset_in_page + bytes_to_copy]);
232
233 Ok(bytes_to_copy)
234 }
235
236 /// Cache the provided slice of data in the buffer pool, returning the remaining bytes that
237 /// didn't fill a whole page. `offset` must be page aligned.
238 ///
239 /// # Panics
240 ///
241 /// Panics if `offset` is not page aligned.
242 pub async fn cache(&self, blob_id: u64, mut buf: &[u8], offset: u64) -> usize {
243 let (mut page_num, offset_in_page) = self.offset_to_page(offset);
244 assert_eq!(offset_in_page, 0);
245 {
246 // Write lock the buffer pool.
247 let mut buffer_pool = self.pool.write().await;
248 while buf.len() >= self.page_size {
249 buffer_pool.cache(self.page_size, blob_id, &buf[..self.page_size], page_num);
250 buf = &buf[self.page_size..];
251 page_num += 1;
252 }
253 }
254
255 buf.len()
256 }
257}
258
259impl Pool {
260 /// Return a new empty buffer pool with an initial next-blob id of 0, and a max cache capacity
261 /// of `capacity` pages.
262 ///
263 /// # Panics
264 ///
265 /// Panics if `capacity` is 0.
266 pub fn new(capacity: usize) -> Self {
267 assert!(capacity > 0);
268 Self {
269 index: HashMap::new(),
270 cache: Vec::new(),
271 clock: 0,
272 next_id: 0,
273 capacity,
274 page_fetches: HashMap::new(),
275 }
276 }
277
278 /// Assign and return the next unique blob id.
279 pub(super) fn next_id(&mut self) -> u64 {
280 let id = self.next_id;
281 self.next_id += 1;
282 id
283 }
284
285 /// Convert an offset into the number of the page it belongs to and the offset within that page.
286 fn offset_to_page(page_size: usize, offset: u64) -> (u64, usize) {
287 (
288 offset / page_size as u64,
289 (offset % page_size as u64) as usize,
290 )
291 }
292
293 /// Attempt to fetch blob data starting at `offset` from the buffer pool. Returns the number of
294 /// bytes read, which could be 0 if the first page in the requested range isn't buffered, and is
295 /// never more than `self.page_size` or the length of `buf`. The returned bytes won't cross a
296 /// page boundary, so multiple reads may be required even if all data in the desired range is
297 /// buffered.
298 fn read_at(&self, page_size: usize, blob_id: u64, buf: &mut [u8], offset: u64) -> usize {
299 let (page_num, offset_in_page) = Self::offset_to_page(page_size, offset);
300 let page_index = self.index.get(&(blob_id, page_num));
301 let Some(&page_index) = page_index else {
302 return 0;
303 };
304 let page = &self.cache[page_index];
305 assert_eq!(page.key, (blob_id, page_num));
306 page.referenced.store(true, Ordering::Relaxed);
307 let page = &page.data;
308
309 let bytes_to_copy = std::cmp::min(buf.len(), page_size - offset_in_page);
310 buf[..bytes_to_copy].copy_from_slice(&page[offset_in_page..offset_in_page + bytes_to_copy]);
311
312 bytes_to_copy
313 }
314
315 /// Put the given `page` into the buffer pool.
316 ///
317 /// # Panics
318 ///
319 /// Panics if the provided page is not exactly PAGE_SIZE bytes long.
320 fn cache(&mut self, page_size: usize, blob_id: u64, page: &[u8], page_num: u64) {
321 assert_eq!(page.len(), page_size);
322
323 let key = (blob_id, page_num);
324 let index_entry = self.index.entry(key);
325 if let Entry::Occupied(index_entry) = index_entry {
326 // This case can result when a blob is truncated across a page boundary, and later grows
327 // back to (beyond) its original size. It will also become expected behavior once we
328 // allow cached pages to be writable.
329 debug!(blob_id, page_num, "updating duplicate page");
330
331 // Update the stale data with the new page.
332 let entry = &mut self.cache[*index_entry.get()];
333 assert_eq!(entry.key, key);
334 entry.referenced.store(true, Ordering::Relaxed);
335 entry.data.copy_from_slice(page);
336 return;
337 }
338
339 if self.cache.len() < self.capacity {
340 self.index.insert(key, self.cache.len());
341 self.cache.push(CacheEntry {
342 key,
343 referenced: AtomicBool::new(true),
344 data: page.into(),
345 });
346 return;
347 }
348
349 // Cache is full, find a page to evict.
350 while self.cache[self.clock].referenced.load(Ordering::Relaxed) {
351 self.cache[self.clock]
352 .referenced
353 .store(false, Ordering::Relaxed);
354 self.clock = (self.clock + 1) % self.cache.len();
355 }
356
357 // Evict the page by replacing it with the new page.
358 let entry = &mut self.cache[self.clock];
359 entry.referenced.store(true, Ordering::Relaxed);
360 assert!(self.index.remove(&entry.key).is_some());
361 self.index.insert(key, self.clock);
362 entry.key = key;
363 entry.data.copy_from_slice(page);
364
365 // Move the clock forward.
366 self.clock = (self.clock + 1) % self.cache.len();
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373 use crate::{deterministic, Runner as _, Storage as _};
374 use commonware_macros::test_traced;
375 use commonware_utils::NZUsize;
376
377 const PAGE_SIZE: usize = 1024;
378
379 #[test_traced]
380 fn test_pool_basic() {
381 let mut pool: Pool = Pool::new(10);
382 assert_eq!(pool.next_id(), 0);
383 assert_eq!(pool.next_id(), 1);
384
385 let mut buf = vec![0; PAGE_SIZE];
386 let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, 0);
387 assert_eq!(bytes_read, 0);
388
389 pool.cache(PAGE_SIZE, 0, &[1; PAGE_SIZE], 0);
390 let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, 0);
391 assert_eq!(bytes_read, PAGE_SIZE);
392 assert_eq!(buf, [1; PAGE_SIZE]);
393
394 // Test replacement -- should log a duplicate page warning but still work.
395 pool.cache(PAGE_SIZE, 0, &[2; PAGE_SIZE], 0);
396 let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, 0);
397 assert_eq!(bytes_read, PAGE_SIZE);
398 assert_eq!(buf, [2; PAGE_SIZE]);
399
400 // Test exceeding the cache capacity.
401 for i in 0u64..11 {
402 pool.cache(PAGE_SIZE, 0, &[i as u8; PAGE_SIZE], i);
403 }
404 // Page 0 should have been evicted.
405 let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, 0);
406 assert_eq!(bytes_read, 0);
407 // Page 1-10 should be in the cache.
408 for i in 1u64..11 {
409 let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, i * PAGE_SIZE as u64);
410 assert_eq!(bytes_read, PAGE_SIZE);
411 assert_eq!(buf, [i as u8; PAGE_SIZE]);
412 }
413
414 // Test reading from an unaligned offset by adding 2 to an aligned offset. The read
415 // should be 2 bytes short of a full page.
416 let mut buf = vec![0; PAGE_SIZE];
417 let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, PAGE_SIZE as u64 + 2);
418 assert_eq!(bytes_read, PAGE_SIZE - 2);
419 assert_eq!(&buf[..PAGE_SIZE - 2], [1; PAGE_SIZE - 2]);
420 }
421
422 #[test_traced]
423 fn test_pool_read_with_blob() {
424 // Initialize the deterministic context
425 let executor = deterministic::Runner::default();
426 // Start the test within the executor
427 executor.start(|context| async move {
428 // Populate a blob with 11 consecutive pages of data.
429 let (blob, size) = context
430 .open("test", "blob".as_bytes())
431 .await
432 .expect("Failed to open blob");
433 assert_eq!(size, 0);
434 for i in 0..11 {
435 let buf = vec![i as u8; PAGE_SIZE];
436 blob.write_at(buf, i * PAGE_SIZE as u64).await.unwrap();
437 }
438
439 // Fill the buffer pool with the blob's data.
440 let pool_ref = PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(10));
441 for i in 0..11 {
442 let mut buf = vec![0; PAGE_SIZE];
443 pool_ref
444 .read(&blob, 0, &mut buf, i * PAGE_SIZE as u64)
445 .await
446 .unwrap();
447 assert_eq!(buf, [i as u8; PAGE_SIZE]);
448 }
449
450 // Repeat the read to exercise reading from the buffer pool. Must start at 1 because
451 // page 0 should be evicted.
452 for i in 1..11 {
453 let mut buf = vec![0; PAGE_SIZE];
454 pool_ref
455 .read(&blob, 0, &mut buf, i * PAGE_SIZE as u64)
456 .await
457 .unwrap();
458 assert_eq!(buf, [i as u8; PAGE_SIZE]);
459 }
460
461 // Cleanup.
462 blob.sync().await.unwrap();
463 });
464 }
465}