1use commonware_runtime::buffer::PoolRef;
80use std::num::{NonZeroU64, NonZeroUsize};
81use thiserror::Error;
82
83mod storage;
84pub use storage::Cache;
85
86#[derive(Debug, Error)]
88pub enum Error {
89 #[error("journal error: {0}")]
90 Journal(#[from] crate::journal::Error),
91 #[error("record corrupted")]
92 RecordCorrupted,
93 #[error("already pruned to: {0}")]
94 AlreadyPrunedTo(u64),
95 #[error("record too large")]
96 RecordTooLarge,
97}
98
99#[derive(Clone)]
101pub struct Config<C> {
102 pub partition: String,
104
105 pub compression: Option<u8>,
107
108 pub codec_config: C,
110
111 pub items_per_blob: NonZeroU64,
113
114 pub write_buffer: NonZeroUsize,
117
118 pub replay_buffer: NonZeroUsize,
120
121 pub buffer_pool: PoolRef,
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use crate::journal::Error as JournalError;
129 use commonware_codec::{varint::UInt, EncodeSize};
130 use commonware_macros::test_traced;
131 use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
132 use commonware_utils::{NZUsize, NZU64};
133 use rand::Rng;
134 use std::collections::BTreeMap;
135
136 const DEFAULT_ITEMS_PER_BLOB: u64 = 65536;
137 const DEFAULT_WRITE_BUFFER: usize = 1024;
138 const DEFAULT_REPLAY_BUFFER: usize = 4096;
139 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
140 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
141
142 #[test_traced]
143 fn test_cache_compression_then_none() {
144 let executor = deterministic::Runner::default();
146 executor.start(|context| async move {
147 let cfg = Config {
149 partition: "test_partition".into(),
150 codec_config: (),
151 compression: Some(3),
152 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
153 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
154 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
155 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
156 };
157 let mut cache = Cache::init(context.clone(), cfg.clone())
158 .await
159 .expect("Failed to initialize cache");
160
161 let index = 1u64;
163 let data = 1;
164 cache.put(index, data).await.expect("Failed to put data");
165
166 cache.close().await.expect("Failed to close cache");
168
169 let cfg = Config {
171 partition: "test_partition".into(),
172 codec_config: (),
173 compression: None,
174 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
175 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
176 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
177 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
178 };
179 let result = Cache::<_, i32>::init(context, cfg.clone()).await;
180 assert!(matches!(
181 result,
182 Err(Error::Journal(JournalError::Codec(_)))
183 ));
184 });
185 }
186
187 #[test_traced]
188 fn test_cache_record_corruption() {
189 let executor = deterministic::Runner::default();
191 executor.start(|context| async move {
192 let cfg = Config {
194 partition: "test_partition".into(),
195 codec_config: (),
196 compression: None,
197 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
198 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
199 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
200 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
201 };
202 let mut cache = Cache::init(context.clone(), cfg.clone())
203 .await
204 .expect("Failed to initialize cache");
205
206 let index = 1u64;
207 let data = 1;
208
209 cache
211 .put(index, data)
212 .await
213 .expect("Failed to put data");
214
215 cache.close().await.expect("Failed to close cache");
217
218 let section = (index / DEFAULT_ITEMS_PER_BLOB) * DEFAULT_ITEMS_PER_BLOB;
220 let (blob, _) = context
221 .open("test_partition", §ion.to_be_bytes())
222 .await
223 .unwrap();
224 let value_location = 4 + UInt(1u64).encode_size() as u64 + 4 ;
225 blob.write_at(b"testdaty".to_vec(), value_location).await.unwrap();
226 blob.sync().await.unwrap();
227
228 let cache = Cache::<_, i32>::init(
230 context,
231 Config {
232 partition: "test_partition".into(),
233 codec_config: (),
234 compression: None,
235 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
236 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
237 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
238 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
239 },
240 )
241 .await.expect("Failed to initialize cache");
242
243 let retrieved: Option<i32> = cache
245 .get(index)
246 .await
247 .expect("Failed to get data");
248 assert!(retrieved.is_none());
249 });
250 }
251
252 #[test_traced]
253 fn test_cache_prune() {
254 let executor = deterministic::Runner::default();
256 executor.start(|context| async move {
257 let cfg = Config {
259 partition: "test_partition".into(),
260 codec_config: (),
261 compression: None,
262 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
263 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
264 items_per_blob: NZU64!(1), buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
266 };
267 let mut cache = Cache::init(context.clone(), cfg.clone())
268 .await
269 .expect("Failed to initialize cache");
270
271 let items = vec![(1u64, 1), (2u64, 2), (3u64, 3), (4u64, 4), (5u64, 5)];
273 for (index, data) in &items {
274 cache.put(*index, *data).await.expect("Failed to put data");
275 }
276 assert_eq!(cache.first(), Some(1));
277
278 let buffer = context.encode();
280 assert!(buffer.contains("items_tracked 5"));
281
282 cache.prune(3).await.expect("Failed to prune");
284
285 for (index, data) in items {
287 let retrieved = cache.get(index).await.expect("Failed to get data");
288 if index < 3 {
289 assert!(retrieved.is_none());
290 } else {
291 assert_eq!(retrieved.expect("Data not found"), data);
292 }
293 }
294 assert_eq!(cache.first(), Some(3));
295
296 let buffer = context.encode();
298 assert!(buffer.contains("items_tracked 3"));
299
300 cache.prune(2).await.expect("Failed to prune");
302 assert_eq!(cache.first(), Some(3));
303
304 cache.prune(3).await.expect("Failed to prune");
306 assert_eq!(cache.first(), Some(3));
307
308 let result = cache.put(1, 1).await;
310 assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
311 });
312 }
313
314 fn test_cache_restart(num_items: usize) -> String {
315 let executor = deterministic::Runner::default();
317 executor.start(|mut context| async move {
318 let items_per_blob = 256u64;
320 let cfg = Config {
321 partition: "test_partition".into(),
322 codec_config: (),
323 compression: None,
324 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
325 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
326 items_per_blob: NZU64!(items_per_blob),
327 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
328 };
329 let mut cache = Cache::init(context.clone(), cfg.clone())
330 .await
331 .expect("Failed to initialize cache");
332
333 let mut items = BTreeMap::new();
335 while items.len() < num_items {
336 let index = items.len() as u64;
337 let mut data = [0u8; 1024];
338 context.fill(&mut data);
339 items.insert(index, data);
340
341 cache.put(index, data).await.expect("Failed to put data");
342 }
343
344 for (index, data) in &items {
346 let retrieved = cache
347 .get(*index)
348 .await
349 .expect("Failed to get data")
350 .expect("Data not found");
351 assert_eq!(retrieved, *data);
352 }
353
354 let buffer = context.encode();
356 let tracked = format!("items_tracked {num_items:?}");
357 assert!(buffer.contains(&tracked));
358
359 cache.close().await.expect("Failed to close cache");
361
362 let cfg = Config {
364 partition: "test_partition".into(),
365 codec_config: (),
366 compression: None,
367 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
368 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
369 items_per_blob: NZU64!(items_per_blob),
370 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
371 };
372 let mut cache = Cache::<_, [u8; 1024]>::init(context.clone(), cfg.clone())
373 .await
374 .expect("Failed to initialize cache");
375
376 for (index, data) in &items {
378 let retrieved = cache
379 .get(*index)
380 .await
381 .expect("Failed to get data")
382 .expect("Data not found");
383 assert_eq!(&retrieved, data);
384 }
385
386 let min = (items.len() / 2) as u64;
388 cache.prune(min).await.expect("Failed to prune");
389
390 let min = (min / items_per_blob) * items_per_blob;
392 let mut removed = 0;
393 for (index, data) in items {
394 if index >= min {
395 let retrieved = cache
396 .get(index)
397 .await
398 .expect("Failed to get data")
399 .expect("Data not found");
400 assert_eq!(retrieved, data);
401 } else {
402 let retrieved = cache.get(index).await.expect("Failed to get data");
403 assert!(retrieved.is_none());
404 removed += 1;
405 }
406 }
407
408 let buffer = context.encode();
410 let tracked = format!("items_tracked {:?}", num_items - removed);
411 assert!(buffer.contains(&tracked));
412
413 context.auditor().state()
414 })
415 }
416
417 #[test_traced]
418 #[ignore]
419 fn test_cache_many_items_and_restart() {
420 test_cache_restart(100_000);
421 }
422
423 #[test_traced]
424 #[ignore]
425 fn test_determinism() {
426 let state1 = test_cache_restart(5_000);
427 let state2 = test_cache_restart(5_000);
428 assert_eq!(state1, state2);
429 }
430
431 #[test_traced]
432 fn test_cache_next_gap() {
433 let executor = deterministic::Runner::default();
434 executor.start(|context| async move {
435 let cfg = Config {
436 partition: "test_partition".into(),
437 codec_config: (),
438 compression: None,
439 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
440 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
441 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
442 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
443 };
444 let mut cache = Cache::init(context.clone(), cfg.clone())
445 .await
446 .expect("Failed to initialize cache");
447
448 assert_eq!(cache.first(), None);
450
451 cache.put(1, 1).await.unwrap();
453 cache.put(10, 10).await.unwrap();
454 cache.put(11, 11).await.unwrap();
455 cache.put(14, 14).await.unwrap();
456
457 let (current_end, start_next) = cache.next_gap(0);
459 assert!(current_end.is_none());
460 assert_eq!(start_next, Some(1));
461 assert_eq!(cache.first(), Some(1));
462
463 let (current_end, start_next) = cache.next_gap(1);
464 assert_eq!(current_end, Some(1));
465 assert_eq!(start_next, Some(10));
466
467 let (current_end, start_next) = cache.next_gap(10);
468 assert_eq!(current_end, Some(11));
469 assert_eq!(start_next, Some(14));
470
471 let (current_end, start_next) = cache.next_gap(11);
472 assert_eq!(current_end, Some(11));
473 assert_eq!(start_next, Some(14));
474
475 let (current_end, start_next) = cache.next_gap(12);
476 assert!(current_end.is_none());
477 assert_eq!(start_next, Some(14));
478
479 let (current_end, start_next) = cache.next_gap(14);
480 assert_eq!(current_end, Some(14));
481 assert!(start_next.is_none());
482 });
483 }
484
485 #[test_traced]
486 fn test_cache_missing_items() {
487 let executor = deterministic::Runner::default();
488 executor.start(|context| async move {
489 let cfg = Config {
490 partition: "test_partition".into(),
491 codec_config: (),
492 compression: None,
493 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
494 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
495 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
496 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
497 };
498 let mut cache = Cache::init(context.clone(), cfg.clone())
499 .await
500 .expect("Failed to initialize cache");
501
502 assert_eq!(cache.first(), None);
504 assert_eq!(cache.missing_items(0, 5), Vec::<u64>::new());
505 assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
506
507 cache.put(1, 1).await.unwrap();
509 cache.put(2, 2).await.unwrap();
510 cache.put(5, 5).await.unwrap();
511 cache.put(6, 6).await.unwrap();
512 cache.put(10, 10).await.unwrap();
513
514 assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
516 assert_eq!(cache.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
517 assert_eq!(cache.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
518
519 assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
521 assert_eq!(cache.missing_items(4, 2), vec![4, 7]);
522
523 assert_eq!(cache.missing_items(1, 3), vec![3, 4, 7]);
525 assert_eq!(cache.missing_items(2, 4), vec![3, 4, 7, 8]);
526 assert_eq!(cache.missing_items(5, 2), vec![7, 8]);
527
528 assert_eq!(cache.missing_items(11, 5), Vec::<u64>::new());
530 assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
531
532 cache.put(1000, 1000).await.unwrap();
534
535 let items = cache.missing_items(11, 10);
537 assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
538
539 let items = cache.missing_items(990, 15);
541 assert_eq!(
542 items,
543 vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
544 );
545
546 cache.sync().await.unwrap();
548 assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
549 assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
550
551 cache.put(DEFAULT_ITEMS_PER_BLOB - 1, 99).await.unwrap();
553 cache.put(DEFAULT_ITEMS_PER_BLOB + 1, 101).await.unwrap();
554
555 let items = cache.missing_items(DEFAULT_ITEMS_PER_BLOB - 2, 5);
557 assert_eq!(
558 items,
559 vec![DEFAULT_ITEMS_PER_BLOB - 2, DEFAULT_ITEMS_PER_BLOB]
560 );
561
562 cache.close().await.expect("Failed to close cache");
563 });
564 }
565
566 #[test_traced]
567 fn test_cache_intervals_after_restart() {
568 let executor = deterministic::Runner::default();
569 executor.start(|context| async move {
570 let cfg = Config {
571 partition: "test_partition".into(),
572 codec_config: (),
573 compression: None,
574 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
575 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
576 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
577 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
578 };
579
580 {
582 let mut cache = Cache::init(context.clone(), cfg.clone())
583 .await
584 .expect("Failed to initialize cache");
585
586 cache.put(0, 0).await.expect("Failed to put data");
587 cache.put(100, 100).await.expect("Failed to put data");
588 cache.put(1000, 1000).await.expect("Failed to put data");
589
590 cache.close().await.expect("Failed to close cache");
591 }
592
593 {
595 let cache = Cache::<_, i32>::init(context.clone(), cfg.clone())
596 .await
597 .expect("Failed to initialize cache");
598
599 let (current_end, start_next) = cache.next_gap(0);
601 assert_eq!(current_end, Some(0));
602 assert_eq!(start_next, Some(100));
603
604 let (current_end, start_next) = cache.next_gap(100);
605 assert_eq!(current_end, Some(100));
606 assert_eq!(start_next, Some(1000));
607
608 let items = cache.missing_items(1, 5);
610 assert_eq!(items, vec![1, 2, 3, 4, 5]);
611 }
612 });
613 }
614
615 #[test_traced]
616 fn test_cache_intervals_with_pruning() {
617 let executor = deterministic::Runner::default();
618 executor.start(|context| async move {
619 let cfg = Config {
620 partition: "test_partition".into(),
621 codec_config: (),
622 compression: None,
623 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
624 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
625 items_per_blob: NZU64!(100), buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
627 };
628 let mut cache = Cache::init(context.clone(), cfg.clone())
629 .await
630 .expect("Failed to initialize cache");
631
632 cache.put(50, 50).await.unwrap();
634 cache.put(150, 150).await.unwrap();
635 cache.put(250, 250).await.unwrap();
636 cache.put(350, 350).await.unwrap();
637
638 let (current_end, start_next) = cache.next_gap(0);
640 assert!(current_end.is_none());
641 assert_eq!(start_next, Some(50));
642
643 cache.prune(200).await.expect("Failed to prune");
645
646 assert!(!cache.has(50));
648 assert!(!cache.has(150));
649
650 let (current_end, start_next) = cache.next_gap(200);
652 assert!(current_end.is_none());
653 assert_eq!(start_next, Some(250));
654
655 let items = cache.missing_items(200, 5);
657 assert_eq!(items, vec![200, 201, 202, 203, 204]);
658
659 assert!(cache.has(250));
661 assert!(cache.has(350));
662 assert_eq!(cache.get(250).await.unwrap(), Some(250));
663 assert_eq!(cache.get(350).await.unwrap(), Some(350));
664 });
665 }
666
667 #[test_traced]
668 fn test_cache_sparse_indices() {
669 let executor = deterministic::Runner::default();
670 executor.start(|context| async move {
671 let cfg = Config {
672 partition: "test_partition".into(),
673 codec_config: (),
674 compression: None,
675 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
676 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
677 items_per_blob: NZU64!(100), buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
679 };
680 let mut cache = Cache::init(context.clone(), cfg.clone())
681 .await
682 .expect("Failed to initialize cache");
683
684 let indices = vec![
686 (0u64, 0),
687 (99u64, 99), (100u64, 100), (500u64, 500), ];
691
692 for (index, value) in &indices {
693 cache.put(*index, *value).await.expect("Failed to put data");
694 }
695
696 assert!(!cache.has(1));
698 assert!(!cache.has(50));
699 assert!(!cache.has(101));
700 assert!(!cache.has(499));
701
702 let (current_end, start_next) = cache.next_gap(50);
704 assert!(current_end.is_none());
705 assert_eq!(start_next, Some(99));
706
707 let (current_end, start_next) = cache.next_gap(99);
708 assert_eq!(current_end, Some(100));
709 assert_eq!(start_next, Some(500));
710
711 cache.sync().await.expect("Failed to sync");
713
714 for (index, value) in &indices {
715 let retrieved = cache
716 .get(*index)
717 .await
718 .expect("Failed to get data")
719 .expect("Data not found");
720 assert_eq!(retrieved, *value);
721 }
722 });
723 }
724
725 #[test_traced]
726 fn test_cache_intervals_edge_cases() {
727 let executor = deterministic::Runner::default();
728 executor.start(|context| async move {
729 let cfg = Config {
730 partition: "test_partition".into(),
731 codec_config: (),
732 compression: None,
733 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
734 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
735 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
736 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
737 };
738 let mut cache = Cache::init(context.clone(), cfg.clone())
739 .await
740 .expect("Failed to initialize cache");
741
742 cache.put(42, 42).await.unwrap();
744
745 let (current_end, start_next) = cache.next_gap(42);
746 assert_eq!(current_end, Some(42));
747 assert!(start_next.is_none());
748
749 let (current_end, start_next) = cache.next_gap(41);
750 assert!(current_end.is_none());
751 assert_eq!(start_next, Some(42));
752
753 let (current_end, start_next) = cache.next_gap(43);
754 assert!(current_end.is_none());
755 assert!(start_next.is_none());
756
757 cache.put(43, 43).await.unwrap();
759 cache.put(44, 44).await.unwrap();
760
761 let (current_end, start_next) = cache.next_gap(42);
762 assert_eq!(current_end, Some(44));
763 assert!(start_next.is_none());
764
765 cache.put(u64::MAX - 1, 999).await.unwrap();
767
768 let (current_end, start_next) = cache.next_gap(u64::MAX - 2);
769 assert!(current_end.is_none());
770 assert_eq!(start_next, Some(u64::MAX - 1));
771
772 let (current_end, start_next) = cache.next_gap(u64::MAX - 1);
773 assert_eq!(current_end, Some(u64::MAX - 1));
774 assert!(start_next.is_none());
775 });
776 }
777
778 #[test_traced]
779 fn test_cache_intervals_duplicate_inserts() {
780 let executor = deterministic::Runner::default();
781 executor.start(|context| async move {
782 let cfg = Config {
783 partition: "test_partition".into(),
784 codec_config: (),
785 compression: None,
786 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
787 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
788 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
789 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
790 };
791 let mut cache = Cache::init(context.clone(), cfg.clone())
792 .await
793 .expect("Failed to initialize cache");
794
795 cache.put(10, 10).await.unwrap();
797 assert!(cache.has(10));
798 assert_eq!(cache.get(10).await.unwrap(), Some(10));
799
800 cache.put(10, 20).await.unwrap();
802 assert!(cache.has(10));
803 assert_eq!(cache.get(10).await.unwrap(), Some(10)); let (current_end, start_next) = cache.next_gap(10);
807 assert_eq!(current_end, Some(10));
808 assert!(start_next.is_none());
809
810 cache.put(9, 9).await.unwrap();
812 cache.put(11, 11).await.unwrap();
813
814 let (current_end, start_next) = cache.next_gap(9);
816 assert_eq!(current_end, Some(11));
817 assert!(start_next.is_none());
818 });
819 }
820}