1use super::get_page_from_blob;
5use crate::{Blob, BufferPool, BufferPooler, Error, IoBuf, IoBufMut};
6use commonware_utils::sync::RwLock;
7use futures::{future::Shared, FutureExt};
8use std::{
9 collections::{hash_map::Entry, HashMap},
10 future::Future,
11 num::{NonZeroU16, NonZeroUsize},
12 pin::Pin,
13 sync::{
14 atomic::{AtomicBool, AtomicU64, Ordering},
15 Arc,
16 },
17};
18use tracing::{debug, error, trace};
19
20type PageFetchFuture = Shared<Pin<Box<dyn Future<Output = Result<IoBuf, Arc<Error>>> + Send>>>;
23
24type PageFetch = Arc<PageFetchFuture>;
27
28struct PageFetchEntry {
36 fetch: PageFetch,
38 waiters: usize,
40}
41
42struct PageFetchGuard {
44 cache: Arc<RwLock<Cache>>,
45 key: (u64, u64),
46 fetch: PageFetch,
47 armed: bool,
48}
49
50impl PageFetchGuard {
51 const fn new(cache: Arc<RwLock<Cache>>, key: (u64, u64), fetch: PageFetch) -> Self {
52 Self {
53 cache,
54 key,
55 fetch,
56 armed: true,
57 }
58 }
59
60 const fn disarm(&mut self) {
61 self.armed = false;
62 }
63}
64
65impl Drop for PageFetchGuard {
66 fn drop(&mut self) {
67 if !self.armed {
68 return;
69 }
70
71 let mut cache = self.cache.write();
78 let Entry::Occupied(mut current) = cache.page_fetches.entry(self.key) else {
79 return;
80 };
81 if !Arc::ptr_eq(¤t.get().fetch, &self.fetch) {
82 return;
83 }
84 if current.get().waiters == 1 {
85 current.remove();
86 } else {
87 current.get_mut().waiters -= 1;
88 }
89 }
90}
91
92struct Cache {
104 index: HashMap<(u64, u64), usize>,
113
114 entries: Vec<CacheEntry>,
120
121 slots: Vec<IoBufMut>,
125
126 page_size: usize,
128
129 clock: usize,
131
132 capacity: usize,
134
135 page_fetches: HashMap<(u64, u64), PageFetchEntry>,
138}
139
140struct CacheEntry {
142 key: (u64, u64),
150
151 referenced: AtomicBool,
153}
154
155#[derive(Clone)]
159pub struct CacheRef {
160 page_size: u64,
167
168 next_id: Arc<AtomicU64>,
170
171 cache: Arc<RwLock<Cache>>,
173
174 pool: BufferPool,
176}
177
178impl CacheRef {
179 pub fn new(pool: BufferPool, page_size: NonZeroU16, capacity: NonZeroUsize) -> Self {
184 let page_size_u64 = page_size.get() as u64;
185
186 Self {
187 page_size: page_size_u64,
188 next_id: Arc::new(AtomicU64::new(0)),
189 cache: Arc::new(RwLock::new(Cache::new(pool.clone(), page_size, capacity))),
190 pool,
191 }
192 }
193
194 pub fn from_pooler(
197 pooler: &impl BufferPooler,
198 page_size: NonZeroU16,
199 capacity: NonZeroUsize,
200 ) -> Self {
201 Self::new(pooler.storage_buffer_pool().clone(), page_size, capacity)
202 }
203
204 #[inline]
206 pub const fn page_size(&self) -> u64 {
207 self.page_size
208 }
209
210 #[inline]
212 pub const fn pool(&self) -> &BufferPool {
213 &self.pool
214 }
215
216 pub fn next_id(&self) -> u64 {
218 self.next_id.fetch_add(1, Ordering::Relaxed)
219 }
220
221 pub const fn offset_to_page(&self, offset: u64) -> (u64, u64) {
224 Cache::offset_to_page(self.page_size, offset)
225 }
226
227 pub(super) fn read_cached(
230 &self,
231 blob_id: u64,
232 mut buf: &mut [u8],
233 mut logical_offset: u64,
234 ) -> usize {
235 let original_len = buf.len();
236 let page_cache = self.cache.read();
237 while !buf.is_empty() {
238 let count = page_cache.read_at(blob_id, buf, logical_offset);
239 if count == 0 {
240 break;
242 }
243 logical_offset += count as u64;
244 buf = &mut buf[count..];
245 }
246 original_len - buf.len()
247 }
248
249 pub(super) fn read_cached_many(&self, blob_id: u64, ranges: &mut Vec<(&mut [u8], u64)>) {
256 let page_cache = self.cache.read();
257 ranges.retain_mut(|(buf, logical_offset)| {
258 let mut remaining = buf.len();
259 let mut offset = *logical_offset;
260 let mut dst = 0;
261 while remaining > 0 {
262 let count = page_cache.read_at(blob_id, &mut buf[dst..], offset);
263 if count == 0 {
264 break;
265 }
266 offset += count as u64;
267 dst += count;
268 remaining -= count;
269 }
270
271 remaining > 0
273 });
274 }
275
276 pub(super) async fn read<B: Blob>(
279 &self,
280 blob: &B,
281 blob_id: u64,
282 mut buf: &mut [u8],
283 mut offset: u64,
284 ) -> Result<(), Error> {
285 while !buf.is_empty() {
288 {
290 let page_cache = self.cache.read();
291 let count = page_cache.read_at(blob_id, buf, offset);
292 if count != 0 {
293 offset += count as u64;
294 buf = &mut buf[count..];
295 continue;
296 }
297 }
298
299 let count = self
301 .read_after_page_fault(blob, blob_id, buf, offset)
302 .await?;
303 offset += count as u64;
304 buf = &mut buf[count..];
305 }
306
307 Ok(())
308 }
309
310 pub(super) async fn read_after_page_fault<B: Blob>(
314 &self,
315 blob: &B,
316 blob_id: u64,
317 buf: &mut [u8],
318 offset: u64,
319 ) -> Result<usize, Error> {
320 assert!(!buf.is_empty());
321
322 let (page_num, offset_in_page) = Cache::offset_to_page(self.page_size, offset);
323 let offset_in_page = offset_in_page as usize;
324 trace!(page_num, blob_id, "page fault");
325
326 let (fetch_future, mut fetch_guard) = {
330 let mut cache = self.cache.write();
331
332 let count = cache.read_at(blob_id, buf, offset);
335 if count != 0 {
336 return Ok(count);
337 }
338
339 let key = (blob_id, page_num);
340 match cache.page_fetches.entry(key) {
341 Entry::Occupied(o) => {
342 let entry = o.into_mut();
344 entry.waiters += 1;
345 let fetch_future = entry.fetch.as_ref().clone();
346 let fetch = Arc::clone(&entry.fetch);
347 (
348 fetch_future,
349 PageFetchGuard::new(Arc::clone(&self.cache), key, fetch),
350 )
351 }
352 Entry::Vacant(v) => {
353 let blob = blob.clone();
356 let cache = Arc::clone(&self.cache);
357 let page_size = self.page_size;
358 let future = async move {
359 let result = fetch_cacheable_page(&blob, page_num, page_size).await;
360 if let Err(err) = &result {
361 error!(page_num, ?err, "Page fetch failed");
362 }
363
364 let mut cache = cache.write();
370 if let Ok(page) = &result {
371 cache.cache(blob_id, page.as_ref(), page_num);
372 }
373 let _ = cache.page_fetches.remove(&key);
374 result
375 };
376
377 let fetch_future = future.boxed().shared();
379 let fetch = Arc::new(fetch_future.clone());
380 v.insert(PageFetchEntry {
381 fetch: Arc::clone(&fetch),
382 waiters: 1,
383 });
384
385 (
386 fetch_future,
387 PageFetchGuard::new(Arc::clone(&self.cache), key, fetch),
388 )
389 }
390 }
391 };
392
393 let fetch_result = fetch_future.await;
397 fetch_guard.disarm();
398 let page_buf = match fetch_result {
399 Ok(page_buf) => page_buf,
400 Err(_) => return Err(Error::ReadFailed),
401 };
402
403 let bytes_to_copy = std::cmp::min(buf.len(), page_buf.len() - offset_in_page);
405 buf[..bytes_to_copy]
406 .copy_from_slice(&page_buf.as_ref()[offset_in_page..offset_in_page + bytes_to_copy]);
407
408 Ok(bytes_to_copy)
409 }
410
411 pub fn cache(&self, blob_id: u64, mut buf: &[u8], offset: u64) -> usize {
419 let (mut page_num, offset_in_page) = self.offset_to_page(offset);
420 assert_eq!(offset_in_page, 0);
421 {
422 let page_size = self.page_size as usize;
424 let mut page_cache = self.cache.write();
425 while buf.len() >= page_size {
426 page_cache.cache(blob_id, &buf[..page_size], page_num);
427 buf = &buf[page_size..];
428 page_num = match page_num.checked_add(1) {
429 Some(next) => next,
430 None => break,
431 };
432 }
433 }
434
435 buf.len()
436 }
437
438 pub(super) fn invalidate_from(&self, blob_id: u64, start_page: u64) {
442 self.cache.write().invalidate_from(blob_id, start_page);
443 }
444}
445
446impl Cache {
447 pub fn new(pool: BufferPool, page_size: NonZeroU16, capacity: NonZeroUsize) -> Self {
450 let page_size = page_size.get() as usize;
451 let capacity = capacity.get();
452 let mut slots = Vec::with_capacity(capacity);
453 for _ in 0..capacity {
454 let slot = pool.alloc_zeroed(page_size);
455 slots.push(slot);
456 }
457 Self {
458 index: HashMap::new(),
459 entries: Vec::with_capacity(capacity),
460 slots,
461 page_size,
462 clock: 0,
463 capacity,
464 page_fetches: HashMap::new(),
465 }
466 }
467
468 #[inline]
470 fn page_slice(&self, slot: usize) -> &[u8] {
471 assert!(slot < self.capacity);
472 self.slots[slot].as_ref()
473 }
474
475 #[inline]
477 fn page_slice_mut(&mut self, slot: usize) -> &mut [u8] {
478 assert!(slot < self.capacity);
479 self.slots[slot].as_mut()
480 }
481
482 const fn offset_to_page(page_size: u64, offset: u64) -> (u64, u64) {
484 (offset / page_size, offset % page_size)
485 }
486
487 fn read_at(&self, blob_id: u64, buf: &mut [u8], logical_offset: u64) -> usize {
493 let (page_num, offset_in_page) =
494 Self::offset_to_page(self.page_size as u64, logical_offset);
495 let Some(&slot) = self.index.get(&(blob_id, page_num)) else {
496 return 0;
497 };
498 let entry = &self.entries[slot];
499 assert_eq!(entry.key, (blob_id, page_num));
500 entry.referenced.store(true, Ordering::Relaxed);
501
502 let page = self.page_slice(slot);
503 let bytes_to_copy = std::cmp::min(buf.len(), self.page_size - offset_in_page as usize);
504 buf[..bytes_to_copy].copy_from_slice(
505 &page[offset_in_page as usize..offset_in_page as usize + bytes_to_copy],
506 );
507
508 bytes_to_copy
509 }
510
511 fn cache(&mut self, blob_id: u64, page: &[u8], page_num: u64) {
513 assert_eq!(page.len(), self.page_size);
514 let key = (blob_id, page_num);
515
516 if let Some(&slot) = self.index.get(&key) {
518 debug!(blob_id, page_num, "updating duplicate page");
522
523 let entry = &self.entries[slot];
525 assert_eq!(entry.key, key);
526 entry.referenced.store(true, Ordering::Relaxed);
527 self.page_slice_mut(slot).copy_from_slice(page);
528 return;
529 }
530
531 if self.entries.len() < self.capacity {
533 let slot = self.entries.len();
535 self.index.insert(key, slot);
536 self.entries.push(CacheEntry {
537 key,
538 referenced: AtomicBool::new(true),
539 });
540 self.page_slice_mut(slot).copy_from_slice(page);
541 return;
542 }
543
544 while self.entries[self.clock].referenced.load(Ordering::Relaxed) {
547 self.entries[self.clock]
548 .referenced
549 .store(false, Ordering::Relaxed);
550 self.clock = (self.clock + 1) % self.entries.len();
551 }
552
553 let slot = self.clock;
558 let entry = &mut self.entries[slot];
559 if self.index.get(&entry.key) == Some(&slot) {
560 self.index.remove(&entry.key);
561 }
562 self.index.insert(key, slot);
563 entry.key = key;
564 entry.referenced.store(true, Ordering::Relaxed);
565 self.page_slice_mut(slot).copy_from_slice(page);
566
567 self.clock = (self.clock + 1) % self.entries.len();
569 }
570
571 fn invalidate_from(&mut self, blob_id: u64, start_page: u64) {
575 self.index.retain(|&(bid, page_num), &mut slot| {
576 if bid != blob_id || page_num < start_page {
577 return true;
578 }
579 self.entries[slot]
580 .referenced
581 .store(false, Ordering::Relaxed);
582 false
583 });
584 }
585}
586
587async fn fetch_cacheable_page(
590 blob: &impl Blob,
591 page_num: u64,
592 page_size: u64,
593) -> Result<IoBuf, Arc<Error>> {
594 let page = get_page_from_blob(blob, page_num, page_size)
595 .await
596 .map_err(Arc::new)?;
597
598 let len = page.len();
601 if len != page_size as usize {
602 error!(
603 page_num,
604 expected = page_size,
605 actual = len,
606 "attempted to fetch partial page from blob"
607 );
608 return Err(Arc::new(Error::InvalidChecksum));
609 }
610
611 Ok(page)
612}
613
614#[cfg(test)]
615mod tests {
616 use super::{super::Checksum, *};
617 use crate::{
618 buffer::paged::CHECKSUM_SIZE, deterministic, telemetry::metrics::Registry, Buf, BufferPool,
619 BufferPoolConfig, Clock as _, IoBufsMut, Runner as _, Spawner as _, Storage as _,
620 Supervisor as _,
621 };
622 use commonware_cryptography::Crc32;
623 use commonware_macros::test_traced;
624 use commonware_utils::{channel::oneshot, sync::Mutex, NZUsize, NZU16};
625 use futures::future::pending;
626 use std::{
627 num::NonZeroU16,
628 sync::{
629 atomic::{AtomicUsize, Ordering},
630 Arc,
631 },
632 time::Duration,
633 };
634
635 fn test_pool() -> BufferPool {
636 let mut registry = Registry::default();
637 BufferPool::new(BufferPoolConfig::for_storage(), &mut registry)
638 }
639
640 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
642 const PAGE_SIZE_U64: u64 = PAGE_SIZE.get() as u64;
643
644 #[derive(Clone)]
646 struct BlockingBlob {
647 started: Arc<Mutex<Option<oneshot::Sender<()>>>>,
648 }
649
650 impl Blob for BlockingBlob {
651 async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
652 self.read_at_buf(offset, len, IoBufsMut::default()).await
653 }
654
655 async fn read_at_buf(
656 &self,
657 _offset: u64,
658 _len: usize,
659 _bufs: impl Into<IoBufsMut> + Send,
660 ) -> Result<IoBufsMut, Error> {
661 let sender = self
662 .started
663 .lock()
664 .take()
665 .expect("blocking blob read started more than once");
666 let _ = sender.send(());
667 pending::<()>().await;
668 unreachable!()
669 }
670
671 async fn write_at(
672 &self,
673 _offset: u64,
674 _bufs: impl Into<crate::IoBufs> + Send,
675 ) -> Result<(), Error> {
676 Ok(())
677 }
678
679 async fn write_at_sync(
680 &self,
681 offset: u64,
682 bufs: impl Into<crate::IoBufs> + Send,
683 ) -> Result<(), Error> {
684 let bufs = bufs.into();
685 if !bufs.has_remaining() {
686 return Ok(());
687 }
688
689 self.write_at(offset, bufs).await?;
690 self.sync().await
691 }
692
693 async fn resize(&self, _len: u64) -> Result<(), Error> {
694 Ok(())
695 }
696
697 async fn sync(&self) -> Result<(), Error> {
698 Ok(())
699 }
700 }
701
702 #[derive(Clone)]
703 enum ControlledBlobResult {
704 Success(Arc<Vec<u8>>),
705 Error,
706 }
707
708 #[derive(Clone)]
710 struct ControlledBlob {
711 started: Arc<Mutex<Option<oneshot::Sender<()>>>>,
712 release: Arc<Mutex<Option<oneshot::Receiver<()>>>>,
713 reads: Arc<AtomicUsize>,
714 result: ControlledBlobResult,
715 }
716
717 impl Blob for ControlledBlob {
718 async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
719 self.read_at_buf(offset, len, IoBufsMut::default()).await
720 }
721
722 async fn read_at_buf(
723 &self,
724 _offset: u64,
725 _len: usize,
726 _bufs: impl Into<IoBufsMut> + Send,
727 ) -> Result<IoBufsMut, Error> {
728 if self.reads.fetch_add(1, Ordering::Relaxed) == 0 {
729 let sender = self
730 .started
731 .lock()
732 .take()
733 .expect("controlled blob start signal consumed more than once");
734 let _ = sender.send(());
735
736 let release = self
737 .release
738 .lock()
739 .take()
740 .expect("controlled blob release receiver consumed more than once");
741 release.await.expect("release signal dropped");
742 }
743
744 match &self.result {
745 ControlledBlobResult::Success(page) => Ok(IoBufsMut::from(page.as_ref().clone())),
746 ControlledBlobResult::Error => Err(Error::ReadFailed),
747 }
748 }
749
750 async fn write_at(
751 &self,
752 _offset: u64,
753 _bufs: impl Into<crate::IoBufs> + Send,
754 ) -> Result<(), Error> {
755 Ok(())
756 }
757
758 async fn write_at_sync(
759 &self,
760 offset: u64,
761 bufs: impl Into<crate::IoBufs> + Send,
762 ) -> Result<(), Error> {
763 let bufs = bufs.into();
764 if !bufs.has_remaining() {
765 return Ok(());
766 }
767
768 self.write_at(offset, bufs).await?;
769 self.sync().await
770 }
771
772 async fn resize(&self, _len: u64) -> Result<(), Error> {
773 Ok(())
774 }
775
776 async fn sync(&self) -> Result<(), Error> {
777 Ok(())
778 }
779 }
780
781 #[test_traced]
782 fn test_cache_basic() {
783 let pool = test_pool();
784 let mut cache: Cache = Cache::new(pool, PAGE_SIZE, NZUsize!(10));
785
786 let mut buf = vec![0; PAGE_SIZE.get() as usize];
788 let bytes_read = cache.read_at(0, &mut buf, 0);
789 assert_eq!(bytes_read, 0);
790
791 cache.cache(0, &[1; PAGE_SIZE.get() as usize], 0);
792 let bytes_read = cache.read_at(0, &mut buf, 0);
793 assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
794 assert_eq!(buf, [1; PAGE_SIZE.get() as usize]);
795
796 cache.cache(0, &[2; PAGE_SIZE.get() as usize], 0);
798 let bytes_read = cache.read_at(0, &mut buf, 0);
799 assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
800 assert_eq!(buf, [2; PAGE_SIZE.get() as usize]);
801
802 for i in 0u64..11 {
804 cache.cache(0, &[i as u8; PAGE_SIZE.get() as usize], i);
805 }
806 let bytes_read = cache.read_at(0, &mut buf, 0);
808 assert_eq!(bytes_read, 0);
809 for i in 1u64..11 {
811 let bytes_read = cache.read_at(0, &mut buf, i * PAGE_SIZE_U64);
812 assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
813 assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
814 }
815
816 let mut buf = vec![0; PAGE_SIZE.get() as usize];
819 let bytes_read = cache.read_at(0, &mut buf, PAGE_SIZE_U64 + 2);
820 assert_eq!(bytes_read, PAGE_SIZE.get() as usize - 2);
821 assert_eq!(
822 &buf[..PAGE_SIZE.get() as usize - 2],
823 [1; PAGE_SIZE.get() as usize - 2]
824 );
825 }
826
827 #[test_traced]
828 fn test_invalidate_from_does_not_orphan_re_cached_page() {
829 let mut registry = Registry::default();
833 let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
834 let mut cache: Cache = Cache::new(pool, PAGE_SIZE, NZUsize!(2));
835 let blob_id = 0u64;
836 let page_size = PAGE_SIZE.get() as usize;
837
838 cache.cache(blob_id, &vec![0xAA; page_size], 0);
840 cache.cache(blob_id, &vec![0xBB; page_size], 1);
841 cache.invalidate_from(blob_id, 0);
842
843 cache.cache(blob_id, &vec![0xCC; page_size], 1);
846 let mut buf = vec![0u8; page_size];
847 assert_eq!(
848 cache.read_at(blob_id, &mut buf, PAGE_SIZE_U64),
849 page_size,
850 "page 1 should be readable after re-cache"
851 );
852 assert_eq!(buf, vec![0xCC; page_size]);
853
854 cache.cache(blob_id, &vec![0xDD; page_size], 2);
858
859 let mut buf = vec![0u8; page_size];
861 assert_eq!(
862 cache.read_at(blob_id, &mut buf, PAGE_SIZE_U64),
863 page_size,
864 "live page 1 was orphaned by stale-slot eviction"
865 );
866 assert_eq!(buf, vec![0xCC; page_size]);
867
868 let mut buf = vec![0u8; page_size];
870 assert_eq!(
871 cache.read_at(blob_id, &mut buf, PAGE_SIZE_U64 * 2),
872 page_size
873 );
874 assert_eq!(buf, vec![0xDD; page_size]);
875 }
876
877 #[test_traced]
878 fn test_cache_read_with_blob() {
879 let executor = deterministic::Runner::default();
881 executor.start(|context| async move {
883 let physical_page_size = PAGE_SIZE_U64 + CHECKSUM_SIZE;
885
886 let (blob, size) = context
888 .open("test", "blob".as_bytes())
889 .await
890 .expect("Failed to open blob");
891 assert_eq!(size, 0);
892 for i in 0..11 {
893 let logical_data = vec![i as u8; PAGE_SIZE.get() as usize];
895 let crc = Crc32::checksum(&logical_data);
896 let record = Checksum::new(PAGE_SIZE.get(), crc);
897 let mut page_data = logical_data;
898 page_data.extend_from_slice(&record.to_bytes());
899 blob.write_at(i * physical_page_size, page_data)
900 .await
901 .unwrap();
902 }
903
904 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
906 assert_eq!(cache_ref.next_id(), 0);
907 assert_eq!(cache_ref.next_id(), 1);
908 for i in 0..11 {
909 let mut buf = vec![0; PAGE_SIZE.get() as usize];
911 cache_ref
912 .read(&blob, 0, &mut buf, i * PAGE_SIZE_U64)
913 .await
914 .unwrap();
915 assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
916 }
917
918 for i in 1..11 {
921 let mut buf = vec![0; PAGE_SIZE.get() as usize];
922 cache_ref
923 .read(&blob, 0, &mut buf, i * PAGE_SIZE_U64)
924 .await
925 .unwrap();
926 assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
927 }
928
929 blob.sync().await.unwrap();
931 });
932 }
933
934 #[test_traced]
935 fn test_cache_max_page() {
936 let executor = deterministic::Runner::default();
937 executor.start(|context| async move {
938 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(2));
939
940 let aligned_max_offset = u64::MAX - (u64::MAX % PAGE_SIZE_U64);
942
943 let logical_data = vec![42u8; PAGE_SIZE.get() as usize];
945
946 let remaining = cache_ref.cache(0, logical_data.as_slice(), aligned_max_offset);
948 assert_eq!(remaining, 0);
949
950 let mut buf = vec![0u8; PAGE_SIZE.get() as usize];
952 let page_cache = cache_ref.cache.read();
953 let bytes_read = page_cache.read_at(0, &mut buf, aligned_max_offset);
954 assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
955 assert!(buf.iter().all(|b| *b == 42));
956 });
957 }
958
959 #[test_traced]
960 fn test_cache_at_high_offset() {
961 let executor = deterministic::Runner::default();
962 executor.start(|context| async move {
963 const MIN_PAGE_SIZE: u64 = CHECKSUM_SIZE + 1;
965 let cache_ref =
966 CacheRef::from_pooler(&context, NZU16!(MIN_PAGE_SIZE as u16), NZUsize!(2));
967
968 let data = vec![1u8; MIN_PAGE_SIZE as usize * 2];
971
972 let aligned_max_offset = u64::MAX - (u64::MAX % MIN_PAGE_SIZE);
975 let high_offset = aligned_max_offset - (MIN_PAGE_SIZE * 2);
976 let remaining = cache_ref.cache(0, &data, high_offset);
977 assert_eq!(remaining, 0);
979
980 let mut buf = vec![0u8; MIN_PAGE_SIZE as usize];
982 let page_cache = cache_ref.cache.read();
983 assert_eq!(
984 page_cache.read_at(0, &mut buf, high_offset),
985 MIN_PAGE_SIZE as usize
986 );
987 assert!(buf.iter().all(|b| *b == 1));
988
989 assert_eq!(
991 page_cache.read_at(0, &mut buf, high_offset + MIN_PAGE_SIZE),
992 MIN_PAGE_SIZE as usize
993 );
994 assert!(buf.iter().all(|b| *b == 1));
995 });
996 }
997
998 #[test_traced]
999 fn test_page_fetches_entry_removed_when_first_fetcher_cancelled() {
1000 let executor = deterministic::Runner::default();
1001 executor.start(|context| async move {
1002 let blob_id = 0;
1004 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
1005 let (started_tx, started_rx) = oneshot::channel();
1006 let blob = BlockingBlob {
1007 started: Arc::new(Mutex::new(Some(started_tx))),
1008 };
1009 let mut read_buf = vec![0u8; PAGE_SIZE.get() as usize];
1010
1011 let cache_ref_for_task = cache_ref.clone();
1013 let blob_for_task = blob.clone();
1014 let handle = context.spawn(move |_| async move {
1015 let _ = cache_ref_for_task
1016 .read(&blob_for_task, blob_id, &mut read_buf, 0)
1017 .await;
1018 });
1019
1020 started_rx.await.expect("blocking read never started");
1022 {
1023 let page_cache = cache_ref.cache.read();
1024 assert!(page_cache.page_fetches.contains_key(&(blob_id, 0)));
1025 }
1026
1027 handle.abort();
1029 assert!(matches!(handle.await, Err(Error::Closed)));
1030
1031 let page_cache = cache_ref.cache.read();
1033 assert!(
1034 !page_cache.page_fetches.contains_key(&(blob_id, 0)),
1035 "cancelled first fetcher should not leave stale page_fetches entry"
1036 );
1037 });
1038 }
1039
1040 #[test_traced]
1041 fn test_followers_keep_single_flight_after_first_fetcher_cancellation() {
1042 let executor = deterministic::Runner::default();
1043 executor.start(|context| async move {
1044 let blob_id = 0;
1045 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
1046
1047 let logical_page = vec![7u8; PAGE_SIZE.get() as usize];
1049 let crc = Crc32::checksum(&logical_page);
1050 let mut physical_page = logical_page.clone();
1051 physical_page.extend_from_slice(&Checksum::new(PAGE_SIZE.get(), crc).to_bytes());
1052 let (started_tx, started_rx) = oneshot::channel();
1053 let (release_tx, release_rx) = oneshot::channel();
1054 let reads = Arc::new(AtomicUsize::new(0));
1055 let blob = ControlledBlob {
1056 started: Arc::new(Mutex::new(Some(started_tx))),
1057 release: Arc::new(Mutex::new(Some(release_rx))),
1058 reads: reads.clone(),
1059 result: ControlledBlobResult::Success(Arc::new(physical_page)),
1060 };
1061
1062 let mut first_buf = vec![0u8; PAGE_SIZE.get() as usize];
1064 let cache_ref_for_first = cache_ref.clone();
1065 let blob_for_first = blob.clone();
1066 let first = context.child("first").spawn(move |_| async move {
1067 let _ = cache_ref_for_first
1068 .read(&blob_for_first, blob_id, &mut first_buf, 0)
1069 .await;
1070 });
1071 started_rx.await.expect("first read never started");
1072
1073 let mut second_buf = vec![0u8; PAGE_SIZE.get() as usize];
1075 let cache_ref_for_second = cache_ref.clone();
1076 let blob_for_second = blob.clone();
1077 let second = context.child("second").spawn(move |_| async move {
1078 cache_ref_for_second
1079 .read(&blob_for_second, blob_id, &mut second_buf, 0)
1080 .await
1081 .expect("second read failed");
1082 second_buf
1083 });
1084
1085 loop {
1087 let joined = {
1088 let page_cache = cache_ref.cache.read();
1089 page_cache
1090 .page_fetches
1091 .get(&(blob_id, 0))
1092 .map(|fetch| fetch.waiters == 2)
1093 .unwrap_or(false)
1094 };
1095 if joined {
1096 break;
1097 }
1098 context.sleep(Duration::from_millis(1)).await;
1099 }
1100
1101 first.abort();
1103 assert!(matches!(first.await, Err(Error::Closed)));
1104
1105 let mut third_buf = vec![0u8; PAGE_SIZE.get() as usize];
1108 let cache_ref_for_third = cache_ref.clone();
1109 let blob_for_third = blob.clone();
1110 let third = context.child("third").spawn(move |_| async move {
1111 cache_ref_for_third
1112 .read(&blob_for_third, blob_id, &mut third_buf, 0)
1113 .await
1114 .expect("third read failed");
1115 third_buf
1116 });
1117
1118 loop {
1121 let third_entered = {
1122 let page_cache = cache_ref.cache.read();
1123 reads.load(Ordering::Relaxed) > 1
1124 || page_cache
1125 .page_fetches
1126 .get(&(blob_id, 0))
1127 .map(|fetch| fetch.waiters == 2)
1128 .unwrap_or(false)
1129 };
1130 if third_entered {
1131 break;
1132 }
1133 context.sleep(Duration::from_millis(1)).await;
1134 }
1135
1136 let _ = release_tx.send(());
1138 let second_buf = second.await.expect("second task failed");
1139 let third_buf = third.await.expect("third task failed");
1140 assert_eq!(second_buf, logical_page);
1141 assert_eq!(third_buf, logical_page);
1142
1143 assert_eq!(reads.load(Ordering::Relaxed), 1);
1145
1146 let mut cached = vec![0u8; PAGE_SIZE.get() as usize];
1148 assert_eq!(
1149 cache_ref.read_cached(blob_id, &mut cached, 0),
1150 PAGE_SIZE.get() as usize
1151 );
1152 assert_eq!(cached, logical_page);
1153
1154 let mut fourth_buf = vec![0u8; PAGE_SIZE.get() as usize];
1156 cache_ref
1157 .read(&blob, blob_id, &mut fourth_buf, 0)
1158 .await
1159 .unwrap();
1160 assert_eq!(fourth_buf, logical_page);
1161 assert_eq!(reads.load(Ordering::Relaxed), 1);
1162
1163 let page_cache = cache_ref.cache.read();
1164 assert!(
1165 !page_cache.page_fetches.contains_key(&(blob_id, 0)),
1166 "completed fetch should leave no stale page_fetches entry"
1167 );
1168 });
1169 }
1170
1171 #[test_traced]
1172 fn test_page_fetch_error_removes_entry_for_all_waiters() {
1173 let executor = deterministic::Runner::default();
1174 executor.start(|context| async move {
1175 let blob_id = 0;
1176 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
1177
1178 let (started_tx, started_rx) = oneshot::channel();
1180 let (release_tx, release_rx) = oneshot::channel();
1181 let reads = Arc::new(AtomicUsize::new(0));
1182 let blob = ControlledBlob {
1183 started: Arc::new(Mutex::new(Some(started_tx))),
1184 release: Arc::new(Mutex::new(Some(release_rx))),
1185 reads: reads.clone(),
1186 result: ControlledBlobResult::Error,
1187 };
1188
1189 let mut first_buf = vec![0u8; PAGE_SIZE.get() as usize];
1191 let cache_ref_for_first = cache_ref.clone();
1192 let blob_for_first = blob.clone();
1193 let first = context.child("first").spawn(move |_| async move {
1194 cache_ref_for_first
1195 .read(&blob_for_first, blob_id, &mut first_buf, 0)
1196 .await
1197 });
1198 started_rx.await.expect("first erroring read never started");
1199
1200 let mut second_buf = vec![0u8; PAGE_SIZE.get() as usize];
1202 let cache_ref_for_second = cache_ref.clone();
1203 let blob_for_second = blob.clone();
1204 let second = context.child("second").spawn(move |_| async move {
1205 cache_ref_for_second
1206 .read(&blob_for_second, blob_id, &mut second_buf, 0)
1207 .await
1208 });
1209
1210 loop {
1212 let joined = {
1213 let page_cache = cache_ref.cache.read();
1214 page_cache
1215 .page_fetches
1216 .get(&(blob_id, 0))
1217 .map(|fetch| fetch.waiters == 2)
1218 .unwrap_or(false)
1219 };
1220 if joined {
1221 break;
1222 }
1223 context.sleep(Duration::from_millis(1)).await;
1224 }
1225
1226 let _ = release_tx.send(());
1228
1229 assert!(matches!(first.await, Ok(Err(Error::ReadFailed))));
1230 assert!(matches!(second.await, Ok(Err(Error::ReadFailed))));
1231 assert_eq!(reads.load(Ordering::Relaxed), 1);
1233
1234 {
1236 let page_cache = cache_ref.cache.read();
1237 assert!(
1238 !page_cache.page_fetches.contains_key(&(blob_id, 0)),
1239 "erroring fetch should leave no stale page_fetches entry"
1240 );
1241 }
1242 let mut cached = vec![0u8; PAGE_SIZE.get() as usize];
1243 assert_eq!(cache_ref.read_cached(blob_id, &mut cached, 0), 0);
1244
1245 let mut third_buf = vec![0u8; PAGE_SIZE.get() as usize];
1247 assert!(matches!(
1248 cache_ref.read(&blob, blob_id, &mut third_buf, 0).await,
1249 Err(Error::ReadFailed)
1250 ));
1251 assert_eq!(reads.load(Ordering::Relaxed), 2);
1252 });
1253 }
1254
1255 #[test_traced]
1256 fn test_read_cached_many_all_cached() {
1257 let pool = test_pool();
1258 let cache_ref = CacheRef::new(pool, PAGE_SIZE, NZUsize!(10));
1259 let blob_id = cache_ref.next_id();
1260 let page0 = vec![0xAA; PAGE_SIZE.get() as usize];
1261 let page1 = vec![0xBB; PAGE_SIZE.get() as usize];
1262
1263 {
1265 let mut cache = cache_ref.cache.write();
1266 cache.cache(blob_id, &page0, 0);
1267 cache.cache(blob_id, &page1, 1);
1268 }
1269
1270 let mut buf0 = vec![0u8; PAGE_SIZE_U64 as usize];
1271 let mut buf1 = vec![0u8; PAGE_SIZE_U64 as usize];
1272 let mut ranges: Vec<(&mut [u8], u64)> = vec![(&mut buf0, 0), (&mut buf1, PAGE_SIZE_U64)];
1273
1274 cache_ref.read_cached_many(blob_id, &mut ranges);
1275
1276 assert!(ranges.is_empty());
1278 drop(ranges);
1279
1280 assert!(buf0 == page0);
1282 assert!(buf1 == page1);
1283 }
1284
1285 #[test_traced]
1286 fn test_read_cached_many_none_cached() {
1287 let pool = test_pool();
1288 let cache_ref = CacheRef::new(pool, PAGE_SIZE, NZUsize!(10));
1289 let blob_id = cache_ref.next_id();
1290
1291 let mut buf0 = vec![0u8; PAGE_SIZE_U64 as usize];
1292 let mut buf1 = vec![0u8; PAGE_SIZE_U64 as usize];
1293 let mut ranges: Vec<(&mut [u8], u64)> = vec![(&mut buf0, 0), (&mut buf1, PAGE_SIZE_U64)];
1294
1295 cache_ref.read_cached_many(blob_id, &mut ranges);
1297 assert_eq!(ranges.len(), 2);
1298 assert_eq!(ranges[0].1, 0);
1299 assert_eq!(ranges[1].1, PAGE_SIZE_U64);
1300 }
1301
1302 #[test_traced]
1303 fn test_read_cached_many_scattered_misses() {
1304 let pool = test_pool();
1307 let cache_ref = CacheRef::new(pool, PAGE_SIZE, NZUsize!(10));
1308 let blob_id = cache_ref.next_id();
1309
1310 let page0 = vec![0x11; PAGE_SIZE.get() as usize];
1311 let page2 = vec![0x33; PAGE_SIZE.get() as usize];
1312 {
1313 let mut cache = cache_ref.cache.write();
1314 cache.cache(blob_id, &page0, 0);
1315 cache.cache(blob_id, &page2, 2);
1317 }
1318
1319 let mut buf0 = vec![0u8; PAGE_SIZE_U64 as usize];
1320 let mut buf1 = vec![0u8; PAGE_SIZE_U64 as usize];
1321 let mut buf2 = vec![0u8; PAGE_SIZE_U64 as usize];
1322 let mut ranges: Vec<(&mut [u8], u64)> = vec![
1323 (&mut buf0, 0),
1324 (&mut buf1, PAGE_SIZE_U64),
1325 (&mut buf2, PAGE_SIZE_U64 * 2),
1326 ];
1327
1328 cache_ref.read_cached_many(blob_id, &mut ranges);
1329
1330 assert_eq!(ranges.len(), 1);
1333 assert_eq!(ranges[0].1, PAGE_SIZE_U64);
1334 drop(ranges);
1335
1336 assert!(buf0 == page0);
1338 assert!(buf2 == page2);
1339 assert!(buf1.iter().all(|b| *b == 0));
1341 }
1342}