1use super::get_page_from_blob;
5use crate::{Blob, BufferPool, BufferPooler, Error, IoBuf, IoBufMut};
6use commonware_utils::sync::RwLock;
7use futures::{future::Shared, FutureExt};
8use std::{
9 collections::{hash_map::Entry, HashMap},
10 future::Future,
11 num::{NonZeroU16, NonZeroUsize},
12 pin::Pin,
13 sync::{
14 atomic::{AtomicBool, AtomicU64, Ordering},
15 Arc,
16 },
17};
18use tracing::{debug, error, trace};
19
20type PageFetchFuture = Shared<Pin<Box<dyn Future<Output = Result<IoBuf, Arc<Error>>> + Send>>>;
23
24type PageFetch = Arc<PageFetchFuture>;
27
28struct PageFetchEntry {
36 fetch: PageFetch,
38 waiters: usize,
40}
41
42struct PageFetchGuard {
44 cache: Arc<RwLock<Cache>>,
45 key: (u64, u64),
46 fetch: PageFetch,
47 armed: bool,
48}
49
50impl PageFetchGuard {
51 const fn new(cache: Arc<RwLock<Cache>>, key: (u64, u64), fetch: PageFetch) -> Self {
52 Self {
53 cache,
54 key,
55 fetch,
56 armed: true,
57 }
58 }
59
60 const fn disarm(&mut self) {
61 self.armed = false;
62 }
63}
64
65impl Drop for PageFetchGuard {
66 fn drop(&mut self) {
67 if !self.armed {
68 return;
69 }
70
71 let mut cache = self.cache.write();
78 let Entry::Occupied(mut current) = cache.page_fetches.entry(self.key) else {
79 return;
80 };
81 if !Arc::ptr_eq(¤t.get().fetch, &self.fetch) {
82 return;
83 }
84 if current.get().waiters == 1 {
85 current.remove();
86 } else {
87 current.get_mut().waiters -= 1;
88 }
89 }
90}
91
92struct Cache {
104 index: HashMap<(u64, u64), usize>,
112
113 entries: Vec<CacheEntry>,
117
118 slots: Vec<IoBufMut>,
122
123 page_size: usize,
125
126 clock: usize,
128
129 capacity: usize,
131
132 page_fetches: HashMap<(u64, u64), PageFetchEntry>,
135}
136
137struct CacheEntry {
139 key: (u64, u64),
141
142 referenced: AtomicBool,
144}
145
146#[derive(Clone)]
150pub struct CacheRef {
151 page_size: u64,
158
159 next_id: Arc<AtomicU64>,
161
162 cache: Arc<RwLock<Cache>>,
164
165 pool: BufferPool,
167}
168
169impl CacheRef {
170 pub fn new(pool: BufferPool, page_size: NonZeroU16, capacity: NonZeroUsize) -> Self {
175 let page_size_u64 = page_size.get() as u64;
176
177 Self {
178 page_size: page_size_u64,
179 next_id: Arc::new(AtomicU64::new(0)),
180 cache: Arc::new(RwLock::new(Cache::new(pool.clone(), page_size, capacity))),
181 pool,
182 }
183 }
184
185 pub fn from_pooler(
188 pooler: &impl BufferPooler,
189 page_size: NonZeroU16,
190 capacity: NonZeroUsize,
191 ) -> Self {
192 Self::new(pooler.storage_buffer_pool().clone(), page_size, capacity)
193 }
194
195 #[inline]
197 pub const fn page_size(&self) -> u64 {
198 self.page_size
199 }
200
201 #[inline]
203 pub const fn pool(&self) -> &BufferPool {
204 &self.pool
205 }
206
207 pub fn next_id(&self) -> u64 {
209 self.next_id.fetch_add(1, Ordering::Relaxed)
210 }
211
212 pub const fn offset_to_page(&self, offset: u64) -> (u64, u64) {
215 Cache::offset_to_page(self.page_size, offset)
216 }
217
218 pub(super) fn read_cached(
221 &self,
222 blob_id: u64,
223 mut buf: &mut [u8],
224 mut logical_offset: u64,
225 ) -> usize {
226 let original_len = buf.len();
227 let page_cache = self.cache.read();
228 while !buf.is_empty() {
229 let count = page_cache.read_at(blob_id, buf, logical_offset);
230 if count == 0 {
231 break;
233 }
234 logical_offset += count as u64;
235 buf = &mut buf[count..];
236 }
237 original_len - buf.len()
238 }
239
240 pub(super) async fn read<B: Blob>(
243 &self,
244 blob: &B,
245 blob_id: u64,
246 mut buf: &mut [u8],
247 mut offset: u64,
248 ) -> Result<(), Error> {
249 while !buf.is_empty() {
252 {
254 let page_cache = self.cache.read();
255 let count = page_cache.read_at(blob_id, buf, offset);
256 if count != 0 {
257 offset += count as u64;
258 buf = &mut buf[count..];
259 continue;
260 }
261 }
262
263 let count = self
265 .read_after_page_fault(blob, blob_id, buf, offset)
266 .await?;
267 offset += count as u64;
268 buf = &mut buf[count..];
269 }
270
271 Ok(())
272 }
273
274 pub(super) async fn read_after_page_fault<B: Blob>(
278 &self,
279 blob: &B,
280 blob_id: u64,
281 buf: &mut [u8],
282 offset: u64,
283 ) -> Result<usize, Error> {
284 assert!(!buf.is_empty());
285
286 let (page_num, offset_in_page) = Cache::offset_to_page(self.page_size, offset);
287 let offset_in_page = offset_in_page as usize;
288 trace!(page_num, blob_id, "page fault");
289
290 let (fetch_future, mut fetch_guard) = {
294 let mut cache = self.cache.write();
295
296 let count = cache.read_at(blob_id, buf, offset);
299 if count != 0 {
300 return Ok(count);
301 }
302
303 let key = (blob_id, page_num);
304 match cache.page_fetches.entry(key) {
305 Entry::Occupied(o) => {
306 let entry = o.into_mut();
308 entry.waiters += 1;
309 let fetch_future = entry.fetch.as_ref().clone();
310 let fetch = Arc::clone(&entry.fetch);
311 (
312 fetch_future,
313 PageFetchGuard::new(Arc::clone(&self.cache), key, fetch),
314 )
315 }
316 Entry::Vacant(v) => {
317 let blob = blob.clone();
320 let cache = Arc::clone(&self.cache);
321 let page_size = self.page_size;
322 let future = async move {
323 let result = fetch_cacheable_page(&blob, page_num, page_size).await;
324 if let Err(err) = &result {
325 error!(page_num, ?err, "Page fetch failed");
326 }
327
328 let mut cache = cache.write();
334 if let Ok(page) = &result {
335 cache.cache(blob_id, page.as_ref(), page_num);
336 }
337 let _ = cache.page_fetches.remove(&key);
338 result
339 };
340
341 let fetch_future = future.boxed().shared();
343 let fetch = Arc::new(fetch_future.clone());
344 v.insert(PageFetchEntry {
345 fetch: Arc::clone(&fetch),
346 waiters: 1,
347 });
348
349 (
350 fetch_future,
351 PageFetchGuard::new(Arc::clone(&self.cache), key, fetch),
352 )
353 }
354 }
355 };
356
357 let fetch_result = fetch_future.await;
361 fetch_guard.disarm();
362 let page_buf = match fetch_result {
363 Ok(page_buf) => page_buf,
364 Err(_) => return Err(Error::ReadFailed),
365 };
366
367 let bytes_to_copy = std::cmp::min(buf.len(), page_buf.len() - offset_in_page);
369 buf[..bytes_to_copy]
370 .copy_from_slice(&page_buf.as_ref()[offset_in_page..offset_in_page + bytes_to_copy]);
371
372 Ok(bytes_to_copy)
373 }
374
375 pub fn cache(&self, blob_id: u64, mut buf: &[u8], offset: u64) -> usize {
383 let (mut page_num, offset_in_page) = self.offset_to_page(offset);
384 assert_eq!(offset_in_page, 0);
385 {
386 let page_size = self.page_size as usize;
388 let mut page_cache = self.cache.write();
389 while buf.len() >= page_size {
390 page_cache.cache(blob_id, &buf[..page_size], page_num);
391 buf = &buf[page_size..];
392 page_num = match page_num.checked_add(1) {
393 Some(next) => next,
394 None => break,
395 };
396 }
397 }
398
399 buf.len()
400 }
401}
402
403impl Cache {
404 pub fn new(pool: BufferPool, page_size: NonZeroU16, capacity: NonZeroUsize) -> Self {
407 let page_size = page_size.get() as usize;
408 let capacity = capacity.get();
409 let mut slots = Vec::with_capacity(capacity);
410 for _ in 0..capacity {
411 let slot = pool.alloc_zeroed(page_size);
412 slots.push(slot);
413 }
414 Self {
415 index: HashMap::new(),
416 entries: Vec::with_capacity(capacity),
417 slots,
418 page_size,
419 clock: 0,
420 capacity,
421 page_fetches: HashMap::new(),
422 }
423 }
424
425 #[inline]
427 fn page_slice(&self, slot: usize) -> &[u8] {
428 assert!(slot < self.capacity);
429 self.slots[slot].as_ref()
430 }
431
432 #[inline]
434 fn page_slice_mut(&mut self, slot: usize) -> &mut [u8] {
435 assert!(slot < self.capacity);
436 self.slots[slot].as_mut()
437 }
438
439 const fn offset_to_page(page_size: u64, offset: u64) -> (u64, u64) {
441 (offset / page_size, offset % page_size)
442 }
443
444 fn read_at(&self, blob_id: u64, buf: &mut [u8], logical_offset: u64) -> usize {
450 let (page_num, offset_in_page) =
451 Self::offset_to_page(self.page_size as u64, logical_offset);
452 let Some(&slot) = self.index.get(&(blob_id, page_num)) else {
453 return 0;
454 };
455 let entry = &self.entries[slot];
456 assert_eq!(entry.key, (blob_id, page_num));
457 entry.referenced.store(true, Ordering::Relaxed);
458
459 let page = self.page_slice(slot);
460 let bytes_to_copy = std::cmp::min(buf.len(), self.page_size - offset_in_page as usize);
461 buf[..bytes_to_copy].copy_from_slice(
462 &page[offset_in_page as usize..offset_in_page as usize + bytes_to_copy],
463 );
464
465 bytes_to_copy
466 }
467
468 fn cache(&mut self, blob_id: u64, page: &[u8], page_num: u64) {
470 assert_eq!(page.len(), self.page_size);
471 let key = (blob_id, page_num);
472
473 if let Some(&slot) = self.index.get(&key) {
475 debug!(blob_id, page_num, "updating duplicate page");
479
480 let entry = &self.entries[slot];
482 assert_eq!(entry.key, key);
483 entry.referenced.store(true, Ordering::Relaxed);
484 self.page_slice_mut(slot).copy_from_slice(page);
485 return;
486 }
487
488 if self.entries.len() < self.capacity {
490 let slot = self.entries.len();
492 self.index.insert(key, slot);
493 self.entries.push(CacheEntry {
494 key,
495 referenced: AtomicBool::new(true),
496 });
497 self.page_slice_mut(slot).copy_from_slice(page);
498 return;
499 }
500
501 while self.entries[self.clock].referenced.load(Ordering::Relaxed) {
503 self.entries[self.clock]
504 .referenced
505 .store(false, Ordering::Relaxed);
506 self.clock = (self.clock + 1) % self.entries.len();
507 }
508
509 let slot = self.clock;
511 let entry = &mut self.entries[slot];
512 assert!(self.index.remove(&entry.key).is_some());
513 self.index.insert(key, slot);
514 entry.key = key;
515 entry.referenced.store(true, Ordering::Relaxed);
516 self.page_slice_mut(slot).copy_from_slice(page);
517
518 self.clock = (self.clock + 1) % self.entries.len();
520 }
521}
522
523async fn fetch_cacheable_page(
526 blob: &impl Blob,
527 page_num: u64,
528 page_size: u64,
529) -> Result<IoBuf, Arc<Error>> {
530 let page = get_page_from_blob(blob, page_num, page_size)
531 .await
532 .map_err(Arc::new)?;
533
534 let len = page.len();
537 if len != page_size as usize {
538 error!(
539 page_num,
540 expected = page_size,
541 actual = len,
542 "attempted to fetch partial page from blob"
543 );
544 return Err(Arc::new(Error::InvalidChecksum));
545 }
546
547 Ok(page)
548}
549
550#[cfg(test)]
551mod tests {
552 use super::{super::Checksum, *};
553 use crate::{
554 buffer::paged::CHECKSUM_SIZE, deterministic, BufferPool, BufferPoolConfig, Clock as _,
555 IoBufsMut, Runner as _, Spawner as _, Storage as _,
556 };
557 use commonware_cryptography::Crc32;
558 use commonware_macros::test_traced;
559 use commonware_utils::{channel::oneshot, sync::Mutex, NZUsize, NZU16};
560 use futures::future::pending;
561 use prometheus_client::registry::Registry;
562 use std::{
563 num::NonZeroU16,
564 sync::{
565 atomic::{AtomicUsize, Ordering},
566 Arc,
567 },
568 time::Duration,
569 };
570
571 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
573 const PAGE_SIZE_U64: u64 = PAGE_SIZE.get() as u64;
574
575 #[derive(Clone)]
577 struct BlockingBlob {
578 started: Arc<Mutex<Option<oneshot::Sender<()>>>>,
579 }
580
581 impl Blob for BlockingBlob {
582 async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
583 self.read_at_buf(offset, len, IoBufsMut::default()).await
584 }
585
586 async fn read_at_buf(
587 &self,
588 _offset: u64,
589 _len: usize,
590 _bufs: impl Into<IoBufsMut> + Send,
591 ) -> Result<IoBufsMut, Error> {
592 let sender = self
593 .started
594 .lock()
595 .take()
596 .expect("blocking blob read started more than once");
597 let _ = sender.send(());
598 pending::<()>().await;
599 unreachable!()
600 }
601
602 async fn write_at(
603 &self,
604 _offset: u64,
605 _bufs: impl Into<crate::IoBufs> + Send,
606 ) -> Result<(), Error> {
607 Ok(())
608 }
609
610 async fn resize(&self, _len: u64) -> Result<(), Error> {
611 Ok(())
612 }
613
614 async fn sync(&self) -> Result<(), Error> {
615 Ok(())
616 }
617 }
618
619 #[derive(Clone)]
620 enum ControlledBlobResult {
621 Success(Arc<Vec<u8>>),
622 Error,
623 }
624
625 #[derive(Clone)]
627 struct ControlledBlob {
628 started: Arc<Mutex<Option<oneshot::Sender<()>>>>,
629 release: Arc<Mutex<Option<oneshot::Receiver<()>>>>,
630 reads: Arc<AtomicUsize>,
631 result: ControlledBlobResult,
632 }
633
634 impl Blob for ControlledBlob {
635 async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
636 self.read_at_buf(offset, len, IoBufsMut::default()).await
637 }
638
639 async fn read_at_buf(
640 &self,
641 _offset: u64,
642 _len: usize,
643 _bufs: impl Into<IoBufsMut> + Send,
644 ) -> Result<IoBufsMut, Error> {
645 if self.reads.fetch_add(1, Ordering::Relaxed) == 0 {
646 let sender = self
647 .started
648 .lock()
649 .take()
650 .expect("controlled blob start signal consumed more than once");
651 let _ = sender.send(());
652
653 let release = self
654 .release
655 .lock()
656 .take()
657 .expect("controlled blob release receiver consumed more than once");
658 release.await.expect("release signal dropped");
659 }
660
661 match &self.result {
662 ControlledBlobResult::Success(page) => Ok(IoBufsMut::from(page.as_ref().clone())),
663 ControlledBlobResult::Error => Err(Error::ReadFailed),
664 }
665 }
666
667 async fn write_at(
668 &self,
669 _offset: u64,
670 _bufs: impl Into<crate::IoBufs> + Send,
671 ) -> Result<(), Error> {
672 Ok(())
673 }
674
675 async fn resize(&self, _len: u64) -> Result<(), Error> {
676 Ok(())
677 }
678
679 async fn sync(&self) -> Result<(), Error> {
680 Ok(())
681 }
682 }
683
684 #[test_traced]
685 fn test_cache_basic() {
686 let mut registry = Registry::default();
687 let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut registry);
688 let mut cache: Cache = Cache::new(pool, PAGE_SIZE, NZUsize!(10));
689
690 let mut buf = vec![0; PAGE_SIZE.get() as usize];
692 let bytes_read = cache.read_at(0, &mut buf, 0);
693 assert_eq!(bytes_read, 0);
694
695 cache.cache(0, &[1; PAGE_SIZE.get() as usize], 0);
696 let bytes_read = cache.read_at(0, &mut buf, 0);
697 assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
698 assert_eq!(buf, [1; PAGE_SIZE.get() as usize]);
699
700 cache.cache(0, &[2; PAGE_SIZE.get() as usize], 0);
702 let bytes_read = cache.read_at(0, &mut buf, 0);
703 assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
704 assert_eq!(buf, [2; PAGE_SIZE.get() as usize]);
705
706 for i in 0u64..11 {
708 cache.cache(0, &[i as u8; PAGE_SIZE.get() as usize], i);
709 }
710 let bytes_read = cache.read_at(0, &mut buf, 0);
712 assert_eq!(bytes_read, 0);
713 for i in 1u64..11 {
715 let bytes_read = cache.read_at(0, &mut buf, i * PAGE_SIZE_U64);
716 assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
717 assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
718 }
719
720 let mut buf = vec![0; PAGE_SIZE.get() as usize];
723 let bytes_read = cache.read_at(0, &mut buf, PAGE_SIZE_U64 + 2);
724 assert_eq!(bytes_read, PAGE_SIZE.get() as usize - 2);
725 assert_eq!(
726 &buf[..PAGE_SIZE.get() as usize - 2],
727 [1; PAGE_SIZE.get() as usize - 2]
728 );
729 }
730
731 #[test_traced]
732 fn test_cache_read_with_blob() {
733 let executor = deterministic::Runner::default();
735 executor.start(|context| async move {
737 let physical_page_size = PAGE_SIZE_U64 + CHECKSUM_SIZE;
739
740 let (blob, size) = context
742 .open("test", "blob".as_bytes())
743 .await
744 .expect("Failed to open blob");
745 assert_eq!(size, 0);
746 for i in 0..11 {
747 let logical_data = vec![i as u8; PAGE_SIZE.get() as usize];
749 let crc = Crc32::checksum(&logical_data);
750 let record = Checksum::new(PAGE_SIZE.get(), crc);
751 let mut page_data = logical_data;
752 page_data.extend_from_slice(&record.to_bytes());
753 blob.write_at(i * physical_page_size, page_data)
754 .await
755 .unwrap();
756 }
757
758 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
760 assert_eq!(cache_ref.next_id(), 0);
761 assert_eq!(cache_ref.next_id(), 1);
762 for i in 0..11 {
763 let mut buf = vec![0; PAGE_SIZE.get() as usize];
765 cache_ref
766 .read(&blob, 0, &mut buf, i * PAGE_SIZE_U64)
767 .await
768 .unwrap();
769 assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
770 }
771
772 for i in 1..11 {
775 let mut buf = vec![0; PAGE_SIZE.get() as usize];
776 cache_ref
777 .read(&blob, 0, &mut buf, i * PAGE_SIZE_U64)
778 .await
779 .unwrap();
780 assert_eq!(buf, [i as u8; PAGE_SIZE.get() as usize]);
781 }
782
783 blob.sync().await.unwrap();
785 });
786 }
787
788 #[test_traced]
789 fn test_cache_max_page() {
790 let executor = deterministic::Runner::default();
791 executor.start(|context| async move {
792 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(2));
793
794 let aligned_max_offset = u64::MAX - (u64::MAX % PAGE_SIZE_U64);
796
797 let logical_data = vec![42u8; PAGE_SIZE.get() as usize];
799
800 let remaining = cache_ref.cache(0, logical_data.as_slice(), aligned_max_offset);
802 assert_eq!(remaining, 0);
803
804 let mut buf = vec![0u8; PAGE_SIZE.get() as usize];
806 let page_cache = cache_ref.cache.read();
807 let bytes_read = page_cache.read_at(0, &mut buf, aligned_max_offset);
808 assert_eq!(bytes_read, PAGE_SIZE.get() as usize);
809 assert!(buf.iter().all(|b| *b == 42));
810 });
811 }
812
813 #[test_traced]
814 fn test_cache_at_high_offset() {
815 let executor = deterministic::Runner::default();
816 executor.start(|context| async move {
817 const MIN_PAGE_SIZE: u64 = CHECKSUM_SIZE + 1;
819 let cache_ref =
820 CacheRef::from_pooler(&context, NZU16!(MIN_PAGE_SIZE as u16), NZUsize!(2));
821
822 let data = vec![1u8; MIN_PAGE_SIZE as usize * 2];
825
826 let aligned_max_offset = u64::MAX - (u64::MAX % MIN_PAGE_SIZE);
829 let high_offset = aligned_max_offset - (MIN_PAGE_SIZE * 2);
830 let remaining = cache_ref.cache(0, &data, high_offset);
831 assert_eq!(remaining, 0);
833
834 let mut buf = vec![0u8; MIN_PAGE_SIZE as usize];
836 let page_cache = cache_ref.cache.read();
837 assert_eq!(
838 page_cache.read_at(0, &mut buf, high_offset),
839 MIN_PAGE_SIZE as usize
840 );
841 assert!(buf.iter().all(|b| *b == 1));
842
843 assert_eq!(
845 page_cache.read_at(0, &mut buf, high_offset + MIN_PAGE_SIZE),
846 MIN_PAGE_SIZE as usize
847 );
848 assert!(buf.iter().all(|b| *b == 1));
849 });
850 }
851
852 #[test_traced]
853 fn test_page_fetches_entry_removed_when_first_fetcher_cancelled() {
854 let executor = deterministic::Runner::default();
855 executor.start(|context| async move {
856 let blob_id = 0;
858 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
859 let (started_tx, started_rx) = oneshot::channel();
860 let blob = BlockingBlob {
861 started: Arc::new(Mutex::new(Some(started_tx))),
862 };
863 let mut read_buf = vec![0u8; PAGE_SIZE.get() as usize];
864
865 let cache_ref_for_task = cache_ref.clone();
867 let blob_for_task = blob.clone();
868 let handle = context.spawn(move |_| async move {
869 let _ = cache_ref_for_task
870 .read(&blob_for_task, blob_id, &mut read_buf, 0)
871 .await;
872 });
873
874 started_rx.await.expect("blocking read never started");
876 {
877 let page_cache = cache_ref.cache.read();
878 assert!(page_cache.page_fetches.contains_key(&(blob_id, 0)));
879 }
880
881 handle.abort();
883 assert!(matches!(handle.await, Err(Error::Closed)));
884
885 let page_cache = cache_ref.cache.read();
887 assert!(
888 !page_cache.page_fetches.contains_key(&(blob_id, 0)),
889 "cancelled first fetcher should not leave stale page_fetches entry"
890 );
891 });
892 }
893
894 #[test_traced]
895 fn test_followers_keep_single_flight_after_first_fetcher_cancellation() {
896 let executor = deterministic::Runner::default();
897 executor.start(|context| async move {
898 let blob_id = 0;
899 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
900
901 let logical_page = vec![7u8; PAGE_SIZE.get() as usize];
903 let crc = Crc32::checksum(&logical_page);
904 let mut physical_page = logical_page.clone();
905 physical_page.extend_from_slice(&Checksum::new(PAGE_SIZE.get(), crc).to_bytes());
906 let (started_tx, started_rx) = oneshot::channel();
907 let (release_tx, release_rx) = oneshot::channel();
908 let reads = Arc::new(AtomicUsize::new(0));
909 let blob = ControlledBlob {
910 started: Arc::new(Mutex::new(Some(started_tx))),
911 release: Arc::new(Mutex::new(Some(release_rx))),
912 reads: reads.clone(),
913 result: ControlledBlobResult::Success(Arc::new(physical_page)),
914 };
915
916 let mut first_buf = vec![0u8; PAGE_SIZE.get() as usize];
918 let cache_ref_for_first = cache_ref.clone();
919 let blob_for_first = blob.clone();
920 let first = context.clone().spawn(move |_| async move {
921 let _ = cache_ref_for_first
922 .read(&blob_for_first, blob_id, &mut first_buf, 0)
923 .await;
924 });
925 started_rx.await.expect("first read never started");
926
927 let mut second_buf = vec![0u8; PAGE_SIZE.get() as usize];
929 let cache_ref_for_second = cache_ref.clone();
930 let blob_for_second = blob.clone();
931 let second = context.clone().spawn(move |_| async move {
932 cache_ref_for_second
933 .read(&blob_for_second, blob_id, &mut second_buf, 0)
934 .await
935 .expect("second read failed");
936 second_buf
937 });
938
939 loop {
941 let joined = {
942 let page_cache = cache_ref.cache.read();
943 page_cache
944 .page_fetches
945 .get(&(blob_id, 0))
946 .map(|fetch| fetch.waiters == 2)
947 .unwrap_or(false)
948 };
949 if joined {
950 break;
951 }
952 context.sleep(Duration::from_millis(1)).await;
953 }
954
955 first.abort();
957 assert!(matches!(first.await, Err(Error::Closed)));
958
959 let mut third_buf = vec![0u8; PAGE_SIZE.get() as usize];
962 let cache_ref_for_third = cache_ref.clone();
963 let blob_for_third = blob.clone();
964 let third = context.clone().spawn(move |_| async move {
965 cache_ref_for_third
966 .read(&blob_for_third, blob_id, &mut third_buf, 0)
967 .await
968 .expect("third read failed");
969 third_buf
970 });
971
972 loop {
975 let third_entered = {
976 let page_cache = cache_ref.cache.read();
977 reads.load(Ordering::Relaxed) > 1
978 || page_cache
979 .page_fetches
980 .get(&(blob_id, 0))
981 .map(|fetch| fetch.waiters == 2)
982 .unwrap_or(false)
983 };
984 if third_entered {
985 break;
986 }
987 context.sleep(Duration::from_millis(1)).await;
988 }
989
990 let _ = release_tx.send(());
992 let second_buf = second.await.expect("second task failed");
993 let third_buf = third.await.expect("third task failed");
994 assert_eq!(second_buf, logical_page);
995 assert_eq!(third_buf, logical_page);
996
997 assert_eq!(reads.load(Ordering::Relaxed), 1);
999
1000 let mut cached = vec![0u8; PAGE_SIZE.get() as usize];
1002 assert_eq!(
1003 cache_ref.read_cached(blob_id, &mut cached, 0),
1004 PAGE_SIZE.get() as usize
1005 );
1006 assert_eq!(cached, logical_page);
1007
1008 let mut fourth_buf = vec![0u8; PAGE_SIZE.get() as usize];
1010 cache_ref
1011 .read(&blob, blob_id, &mut fourth_buf, 0)
1012 .await
1013 .unwrap();
1014 assert_eq!(fourth_buf, logical_page);
1015 assert_eq!(reads.load(Ordering::Relaxed), 1);
1016
1017 let page_cache = cache_ref.cache.read();
1018 assert!(
1019 !page_cache.page_fetches.contains_key(&(blob_id, 0)),
1020 "completed fetch should leave no stale page_fetches entry"
1021 );
1022 });
1023 }
1024
1025 #[test_traced]
1026 fn test_page_fetch_error_removes_entry_for_all_waiters() {
1027 let executor = deterministic::Runner::default();
1028 executor.start(|context| async move {
1029 let blob_id = 0;
1030 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10));
1031
1032 let (started_tx, started_rx) = oneshot::channel();
1034 let (release_tx, release_rx) = oneshot::channel();
1035 let reads = Arc::new(AtomicUsize::new(0));
1036 let blob = ControlledBlob {
1037 started: Arc::new(Mutex::new(Some(started_tx))),
1038 release: Arc::new(Mutex::new(Some(release_rx))),
1039 reads: reads.clone(),
1040 result: ControlledBlobResult::Error,
1041 };
1042
1043 let mut first_buf = vec![0u8; PAGE_SIZE.get() as usize];
1045 let cache_ref_for_first = cache_ref.clone();
1046 let blob_for_first = blob.clone();
1047 let first = context.clone().spawn(move |_| async move {
1048 cache_ref_for_first
1049 .read(&blob_for_first, blob_id, &mut first_buf, 0)
1050 .await
1051 });
1052 started_rx.await.expect("first erroring read never started");
1053
1054 let mut second_buf = vec![0u8; PAGE_SIZE.get() as usize];
1056 let cache_ref_for_second = cache_ref.clone();
1057 let blob_for_second = blob.clone();
1058 let second = context.clone().spawn(move |_| async move {
1059 cache_ref_for_second
1060 .read(&blob_for_second, blob_id, &mut second_buf, 0)
1061 .await
1062 });
1063
1064 loop {
1066 let joined = {
1067 let page_cache = cache_ref.cache.read();
1068 page_cache
1069 .page_fetches
1070 .get(&(blob_id, 0))
1071 .map(|fetch| fetch.waiters == 2)
1072 .unwrap_or(false)
1073 };
1074 if joined {
1075 break;
1076 }
1077 context.sleep(Duration::from_millis(1)).await;
1078 }
1079
1080 let _ = release_tx.send(());
1082
1083 assert!(matches!(first.await, Ok(Err(Error::ReadFailed))));
1084 assert!(matches!(second.await, Ok(Err(Error::ReadFailed))));
1085 assert_eq!(reads.load(Ordering::Relaxed), 1);
1087
1088 {
1090 let page_cache = cache_ref.cache.read();
1091 assert!(
1092 !page_cache.page_fetches.contains_key(&(blob_id, 0)),
1093 "erroring fetch should leave no stale page_fetches entry"
1094 );
1095 }
1096 let mut cached = vec![0u8; PAGE_SIZE.get() as usize];
1097 assert_eq!(cache_ref.read_cached(blob_id, &mut cached, 0), 0);
1098
1099 let mut third_buf = vec![0u8; PAGE_SIZE.get() as usize];
1101 assert!(matches!(
1102 cache_ref.read(&blob, blob_id, &mut third_buf, 0).await,
1103 Err(Error::ReadFailed)
1104 ));
1105 assert_eq!(reads.load(Ordering::Relaxed), 2);
1106 });
1107 }
1108}