commonware_runtime/utils/buffer/pool.rs
1use crate::{Blob, Error, RwLock};
2use commonware_utils::StableBuf;
3use futures::{future::Shared, FutureExt};
4use std::{
5 collections::{hash_map::Entry, HashMap},
6 future::Future,
7 pin::Pin,
8 sync::{
9 atomic::{AtomicBool, Ordering},
10 Arc,
11 },
12};
13use tracing::{debug, trace};
14
15// Type alias for the future we'll be storing for each in-flight page fetch.
16//
17// We wrap [Error] in an Arc so it will be cloneable, which is required for the future to be
18// [Shared].
19type PageFetchFut = Shared<Pin<Box<dyn Future<Output = Result<StableBuf, Arc<Error>>> + Send>>>;
20
21/// A [Pool] caches pages of [Blob] data in memory.
22///
23/// A single buffer pool can be used to cache data from multiple blobs by assigning a unique id to
24/// each.
25///
26/// Implements the [Clock](https://en.wikipedia.org/wiki/Page_replacement_algorithm#Clock)
27/// replacement policy, which is a lightweight approximation of LRU. The page `cache` is a circular
28/// list of recently accessed pages, and `clock` is the index of the next page within it to examine
29/// for replacement. When a page needs to be evicted, we start the search at `clock` within `cache`,
30/// searching for the first page with a false reference bit, and setting any skipped page's
31/// reference bit to false along the way.
32pub struct Pool {
33 /// The page cache index, with a key composed of (blob id, page number), that maps each cached
34 /// page to the index of its `cache` entry.
35 ///
36 /// # Invariants
37 ///
38 /// Each `index` entry maps to exactly one `cache` entry, and that cache entry always has a
39 /// matching key.
40 index: HashMap<(u64, u64), usize>,
41
42 /// The page cache.
43 ///
44 /// Each `cache` entry has exactly one corresponding `index` entry.
45 cache: Vec<CacheEntry>,
46
47 /// The Clock replacement policy's clock hand index into `cache`.
48 clock: usize,
49
50 /// The next id to assign to a blob that will be managed by this pool.
51 next_id: u64,
52
53 /// The maximum number of pages that will be cached.
54 capacity: usize,
55
56 /// A map of currently executing page fetches to ensure only one task at a time is trying to
57 /// fetch a specific page.
58 page_fetches: HashMap<(u64, u64), PageFetchFut>,
59}
60
61struct CacheEntry {
62 /// The cache key which is composed of the blob id and page number of the page.
63 key: (u64, u64),
64
65 /// A bit indicating whether this page was recently referenced.
66 referenced: AtomicBool,
67
68 /// The cached page itself.
69 data: Vec<u8>,
70}
71
72/// A reference to a [Pool] that can be shared across threads via cloning, along with the page size
73/// that will be used with it. Provides the API for interacting with the buffer pool in a
74/// thread-safe manner.
75#[derive(Clone)]
76pub struct PoolRef {
77 /// The size of each page in the buffer pool.
78 pub(super) page_size: usize,
79
80 /// Shareable reference to the buffer pool.
81 pool: Arc<RwLock<Pool>>,
82}
83
84impl PoolRef {
85 /// Returns a new [PoolRef] with the given `page_size` and `capacity`.
86 pub fn new(page_size: usize, capacity: usize) -> Self {
87 Self {
88 page_size,
89 pool: Arc::new(RwLock::new(Pool::new(capacity))),
90 }
91 }
92
93 /// Returns a unique id for the next blob that will use this buffer pool.
94 pub async fn next_id(&self) -> u64 {
95 let mut pool = self.pool.write().await;
96 pool.next_id()
97 }
98
99 /// Convert an offset into the number of the page it belongs to and the offset within that page.
100 pub fn offset_to_page(&self, offset: u64) -> (u64, usize) {
101 Pool::offset_to_page(self.page_size, offset)
102 }
103
104 /// Read the specified bytes, preferentially from the buffer pool cache. Bytes not found in the
105 /// buffer pool will be read from the provided `blob` and cached for future reads.
106 ///
107 /// # Warning
108 ///
109 /// Attempts to read any of the last (blob_size % page_size) "trailing bytes" of the blob will
110 /// result in a ReadFailed error since the buffer pool only deals with page sized chunks.
111 /// Trailing bytes need to be dealt with outside of the buffer pool. For example,
112 /// [crate::buffer::Append] uses a [crate::buffer::tip::Buffer] to buffer them.
113 pub(super) async fn read<B: Blob>(
114 &self,
115 blob: &B,
116 blob_id: u64,
117 mut buf: &mut [u8],
118 mut offset: u64,
119 ) -> Result<(), Error> {
120 // Read up to a page worth of data at a time from either the buffer pool or the `blob`,
121 // until the requested data is fully read.
122 while !buf.is_empty() {
123 // Read lock the buffer pool and see if we can get (some of) the data from it.
124 {
125 let buffer_pool = self.pool.read().await;
126 let count = buffer_pool.read_at(self.page_size, blob_id, buf, offset);
127 if count != 0 {
128 offset += count as u64;
129 buf = &mut buf[count..];
130 continue;
131 }
132 }
133
134 // Handle page fault.
135 let count = self
136 .read_after_page_fault(blob, blob_id, buf, offset)
137 .await?;
138 offset += count as u64;
139 buf = &mut buf[count..];
140 }
141
142 Ok(())
143 }
144
145 /// Fetch the specified page after encountering a page fault, which may involve retrieving it
146 /// from `blob` & caching the result in `pool`. Returns the number of bytes read, which should
147 /// always be non-zero.
148 async fn read_after_page_fault<B: Blob>(
149 &self,
150 blob: &B,
151 blob_id: u64,
152 buf: &mut [u8],
153 offset: u64,
154 ) -> Result<usize, Error> {
155 assert!(!buf.is_empty());
156
157 let (page_num, offset_in_page) = Pool::offset_to_page(self.page_size, offset);
158 let page_size = self.page_size;
159 trace!(page_num, blob_id, "page fault");
160
161 // Create or clone a future that retrieves the desired page from the underlying blob. This
162 // requires a write lock on the buffer pool since we may need to modify `page_fetches` if
163 // this is the first fetcher.
164 let (fetch_future, is_first_fetcher) = {
165 let mut pool = self.pool.write().await;
166
167 // There's a (small) chance the page was fetched & buffered by another task before we
168 // were able to acquire the write lock, so check the cache before doing anything else.
169 let count = pool.read_at(page_size, blob_id, buf, offset);
170 if count != 0 {
171 return Ok(count);
172 }
173
174 let entry = pool.page_fetches.entry((blob_id, page_num));
175 match entry {
176 Entry::Occupied(o) => {
177 // Another thread is already fetching this page, so clone its existing future.
178 (o.get().clone(), false)
179 }
180 Entry::Vacant(v) => {
181 // Nobody is currently fetching this page, so create a future that will do the work.
182 let blob = blob.clone();
183 let future = async move {
184 blob.read_at(vec![0; page_size], page_num * page_size as u64)
185 .await
186 .map_err(Arc::new)
187 };
188
189 // Make the future shareable and insert it into the map.
190 let shareable = future.boxed().shared();
191 v.insert(shareable.clone());
192
193 (shareable, true)
194 }
195 }
196 };
197
198 // Await the future and get the page buffer. If this isn't the task that initiated the
199 // fetch, we can return immediately with the result. Note that we cannot return immediately
200 // on error, since we'd bypass the cleanup required of the first fetcher.
201 let fetch_result = fetch_future.await;
202 if !is_first_fetcher {
203 // Copy the requested portion of the page into the buffer and return immediately.
204 let page_buf: Vec<u8> = fetch_result.map_err(|_| Error::ReadFailed)?.into();
205 let bytes_to_copy = std::cmp::min(buf.len(), page_size - offset_in_page);
206 buf[..bytes_to_copy]
207 .copy_from_slice(&page_buf[offset_in_page..offset_in_page + bytes_to_copy]);
208 return Ok(bytes_to_copy);
209 }
210
211 // This is the task that initiated the fetch, so it is responsible for cleaning up the
212 // inserted entry, and caching the page in the buffer pool if the fetch didn't error out.
213 // This requires a write lock on the buffer pool to modify `page_fetches` and cache the
214 // page.
215 let mut pool = self.pool.write().await;
216
217 // Remove the entry from `page_fetches`.
218 let _ = pool.page_fetches.remove(&(blob_id, page_num));
219
220 // Cache the result in the buffer pool.
221 let Ok(page_buf) = fetch_result else {
222 return Err(Error::ReadFailed);
223 };
224 pool.cache(page_size, blob_id, page_buf.as_ref(), page_num);
225
226 // Copy the requested portion of the page into the buffer.
227 let page_buf: Vec<u8> = page_buf.into();
228 let bytes_to_copy = std::cmp::min(buf.len(), page_size - offset_in_page);
229 buf[..bytes_to_copy]
230 .copy_from_slice(&page_buf[offset_in_page..offset_in_page + bytes_to_copy]);
231
232 Ok(bytes_to_copy)
233 }
234
235 /// Cache the provided slice of data in the buffer pool, returning the remaining bytes that
236 /// didn't fill a whole page. `offset` must be page aligned.
237 ///
238 /// # Panics
239 ///
240 /// Panics if `offset` is not page aligned.
241 pub async fn cache(&self, blob_id: u64, mut buf: &[u8], offset: u64) -> usize {
242 let (mut page_num, offset_in_page) = self.offset_to_page(offset);
243 assert_eq!(offset_in_page, 0);
244 {
245 // Write lock the buffer pool.
246 let mut buffer_pool = self.pool.write().await;
247 while buf.len() >= self.page_size {
248 buffer_pool.cache(self.page_size, blob_id, &buf[..self.page_size], page_num);
249 buf = &buf[self.page_size..];
250 page_num += 1;
251 }
252 }
253
254 buf.len()
255 }
256}
257
258impl Pool {
259 /// Return a new empty buffer pool with an initial next-blob id of 0, and a max cache capacity
260 /// of `capacity` pages.
261 ///
262 /// # Panics
263 ///
264 /// Panics if `capacity` is 0.
265 pub fn new(capacity: usize) -> Self {
266 assert!(capacity > 0);
267 Self {
268 index: HashMap::new(),
269 cache: Vec::new(),
270 clock: 0,
271 next_id: 0,
272 capacity,
273 page_fetches: HashMap::new(),
274 }
275 }
276
277 /// Assign and return the next unique blob id.
278 pub(super) fn next_id(&mut self) -> u64 {
279 let id = self.next_id;
280 self.next_id += 1;
281 id
282 }
283
284 /// Convert an offset into the number of the page it belongs to and the offset within that page.
285 fn offset_to_page(page_size: usize, offset: u64) -> (u64, usize) {
286 (
287 offset / page_size as u64,
288 (offset % page_size as u64) as usize,
289 )
290 }
291
292 /// Attempt to fetch blob data starting at `offset` from the buffer pool. Returns the number of
293 /// bytes read, which could be 0 if the first page in the requested range isn't buffered, and is
294 /// never more than `self.page_size` or the length of `buf`. The returned bytes won't cross a
295 /// page boundary, so multiple reads may be required even if all data in the desired range is
296 /// buffered.
297 fn read_at(&self, page_size: usize, blob_id: u64, buf: &mut [u8], offset: u64) -> usize {
298 let (page_num, offset_in_page) = Self::offset_to_page(page_size, offset);
299 let page_index = self.index.get(&(blob_id, page_num));
300 let Some(&page_index) = page_index else {
301 return 0;
302 };
303 let page = &self.cache[page_index];
304 assert_eq!(page.key, (blob_id, page_num));
305 page.referenced.store(true, Ordering::Relaxed);
306 let page = &page.data;
307
308 let bytes_to_copy = std::cmp::min(buf.len(), page_size - offset_in_page);
309 buf[..bytes_to_copy].copy_from_slice(&page[offset_in_page..offset_in_page + bytes_to_copy]);
310
311 bytes_to_copy
312 }
313
314 /// Put the given `page` into the buffer pool.
315 ///
316 /// # Panics
317 ///
318 /// Panics if the provided page is not exactly PAGE_SIZE bytes long.
319 fn cache(&mut self, page_size: usize, blob_id: u64, page: &[u8], page_num: u64) {
320 assert_eq!(page.len(), page_size);
321
322 let key = (blob_id, page_num);
323 let index_entry = self.index.entry(key);
324 if let Entry::Occupied(index_entry) = index_entry {
325 // This case can result when a blob is truncated across a page boundary, and later grows
326 // back to (beyond) its original size. It will also become expected behavior once we
327 // allow cached pages to be writable.
328 debug!(blob_id, page_num, "updating duplicate page");
329
330 // Update the stale data with the new page.
331 let entry = &mut self.cache[*index_entry.get()];
332 assert_eq!(entry.key, key);
333 entry.referenced.store(true, Ordering::Relaxed);
334 entry.data.copy_from_slice(page);
335 return;
336 }
337
338 if self.cache.len() < self.capacity {
339 self.index.insert(key, self.cache.len());
340 self.cache.push(CacheEntry {
341 key,
342 referenced: AtomicBool::new(true),
343 data: page.into(),
344 });
345 return;
346 }
347
348 // Cache is full, find a page to evict.
349 while self.cache[self.clock].referenced.load(Ordering::Relaxed) {
350 self.cache[self.clock]
351 .referenced
352 .store(false, Ordering::Relaxed);
353 self.clock = (self.clock + 1) % self.cache.len();
354 }
355
356 // Evict the page by replacing it with the new page.
357 let entry = &mut self.cache[self.clock];
358 entry.referenced.store(true, Ordering::Relaxed);
359 assert!(self.index.remove(&entry.key).is_some());
360 self.index.insert(key, self.clock);
361 entry.key = key;
362 entry.data.copy_from_slice(page);
363
364 // Move the clock forward.
365 self.clock = (self.clock + 1) % self.cache.len();
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 use crate::{deterministic, Runner as _, Storage as _};
373 use commonware_macros::test_traced;
374
375 const PAGE_SIZE: usize = 1024;
376
377 #[test_traced]
378 fn test_pool_basic() {
379 let mut pool: Pool = Pool::new(10);
380 assert_eq!(pool.next_id(), 0);
381 assert_eq!(pool.next_id(), 1);
382
383 let mut buf = vec![0; PAGE_SIZE];
384 let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, 0);
385 assert_eq!(bytes_read, 0);
386
387 pool.cache(PAGE_SIZE, 0, &[1; PAGE_SIZE], 0);
388 let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, 0);
389 assert_eq!(bytes_read, PAGE_SIZE);
390 assert_eq!(buf, [1; PAGE_SIZE]);
391
392 // Test replacement -- should log a duplicate page warning but still work.
393 pool.cache(PAGE_SIZE, 0, &[2; PAGE_SIZE], 0);
394 let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, 0);
395 assert_eq!(bytes_read, PAGE_SIZE);
396 assert_eq!(buf, [2; PAGE_SIZE]);
397
398 // Test exceeding the cache capacity.
399 for i in 0u64..11 {
400 pool.cache(PAGE_SIZE, 0, &[i as u8; PAGE_SIZE], i);
401 }
402 // Page 0 should have been evicted.
403 let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, 0);
404 assert_eq!(bytes_read, 0);
405 // Page 1-10 should be in the cache.
406 for i in 1u64..11 {
407 let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, i * PAGE_SIZE as u64);
408 assert_eq!(bytes_read, PAGE_SIZE);
409 assert_eq!(buf, [i as u8; PAGE_SIZE]);
410 }
411
412 // Test reading from an unaligned offset by adding 2 to an aligned offset. The read
413 // should be 2 bytes short of a full page.
414 let mut buf = vec![0; PAGE_SIZE];
415 let bytes_read = pool.read_at(PAGE_SIZE, 0, &mut buf, PAGE_SIZE as u64 + 2);
416 assert_eq!(bytes_read, PAGE_SIZE - 2);
417 assert_eq!(&buf[..PAGE_SIZE - 2], [1; PAGE_SIZE - 2]);
418 }
419
420 #[test_traced]
421 fn test_pool_read_with_blob() {
422 // Initialize the deterministic context
423 let executor = deterministic::Runner::default();
424 // Start the test within the executor
425 executor.start(|context| async move {
426 // Populate a blob with 11 consecutive pages of data.
427 let (blob, size) = context
428 .open("test", "blob".as_bytes())
429 .await
430 .expect("Failed to open blob");
431 assert_eq!(size, 0);
432 for i in 0..11 {
433 let buf = vec![i as u8; PAGE_SIZE];
434 blob.write_at(buf, i * PAGE_SIZE as u64).await.unwrap();
435 }
436
437 // Fill the buffer pool with the blob's data.
438 let pool_ref = PoolRef::new(PAGE_SIZE, 10);
439 for i in 0..11 {
440 let mut buf = vec![0; PAGE_SIZE];
441 pool_ref
442 .read(&blob, 0, &mut buf, i * PAGE_SIZE as u64)
443 .await
444 .unwrap();
445 assert_eq!(buf, [i as u8; PAGE_SIZE]);
446 }
447
448 // Repeat the read to exercise reading from the buffer pool. Must start at 1 because
449 // page 0 should be evicted.
450 for i in 1..11 {
451 let mut buf = vec![0; PAGE_SIZE];
452 pool_ref
453 .read(&blob, 0, &mut buf, i * PAGE_SIZE as u64)
454 .await
455 .unwrap();
456 assert_eq!(buf, [i as u8; PAGE_SIZE]);
457 }
458
459 // Cleanup.
460 blob.close().await.expect("Failed to destroy blob");
461 });
462 }
463}