1use commonware_runtime::buffer::PoolRef;
80use std::num::{NonZeroU64, NonZeroUsize};
81use thiserror::Error;
82
83mod storage;
84pub use storage::Cache;
85
86#[derive(Debug, Error)]
88pub enum Error {
89 #[error("journal error: {0}")]
90 Journal(#[from] crate::journal::Error),
91 #[error("record corrupted")]
92 RecordCorrupted,
93 #[error("already pruned to: {0}")]
94 AlreadyPrunedTo(u64),
95 #[error("record too large")]
96 RecordTooLarge,
97}
98
99#[derive(Clone)]
101pub struct Config<C> {
102 pub partition: String,
104
105 pub compression: Option<u8>,
107
108 pub codec_config: C,
110
111 pub items_per_blob: NonZeroU64,
113
114 pub write_buffer: NonZeroUsize,
117
118 pub replay_buffer: NonZeroUsize,
120
121 pub buffer_pool: PoolRef,
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use crate::journal::Error as JournalError;
129 use commonware_codec::{varint::UInt, EncodeSize};
130 use commonware_macros::test_traced;
131 use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
132 use commonware_utils::{NZUsize, NZU64};
133 use rand::Rng;
134 use std::collections::BTreeMap;
135
136 const DEFAULT_ITEMS_PER_BLOB: u64 = 65536;
137 const DEFAULT_WRITE_BUFFER: usize = 1024;
138 const DEFAULT_REPLAY_BUFFER: usize = 4096;
139 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
140 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
141
142 #[test_traced]
143 fn test_cache_compression_then_none() {
144 let executor = deterministic::Runner::default();
146 executor.start(|context| async move {
147 let cfg = Config {
149 partition: "test_partition".into(),
150 codec_config: (),
151 compression: Some(3),
152 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
153 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
154 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
155 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
156 };
157 let mut cache = Cache::init(context.clone(), cfg.clone())
158 .await
159 .expect("Failed to initialize cache");
160
161 let index = 1u64;
163 let data = 1;
164 cache.put(index, data).await.expect("Failed to put data");
165
166 cache.close().await.expect("Failed to close cache");
168
169 let cfg = Config {
171 partition: "test_partition".into(),
172 codec_config: (),
173 compression: None,
174 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
175 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
176 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
177 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
178 };
179 let result = Cache::<_, i32>::init(context, cfg.clone()).await;
180 assert!(matches!(
181 result,
182 Err(Error::Journal(JournalError::Codec(_)))
183 ));
184 });
185 }
186
187 #[test_traced]
188 fn test_cache_record_corruption() {
189 let executor = deterministic::Runner::default();
191 executor.start(|context| async move {
192 let cfg = Config {
194 partition: "test_partition".into(),
195 codec_config: (),
196 compression: None,
197 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
198 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
199 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
200 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
201 };
202 let mut cache = Cache::init(context.clone(), cfg.clone())
203 .await
204 .expect("Failed to initialize cache");
205
206 let index = 1u64;
207 let data = 1;
208
209 cache
211 .put(index, data)
212 .await
213 .expect("Failed to put data");
214
215 cache.close().await.expect("Failed to close cache");
217
218 let section = (index / DEFAULT_ITEMS_PER_BLOB) * DEFAULT_ITEMS_PER_BLOB;
220 let (blob, _) = context
221 .open("test_partition", §ion.to_be_bytes())
222 .await
223 .unwrap();
224 let value_location = 4 + UInt(1u64).encode_size() as u64 + 4 ;
225 blob.write_at(b"testdaty".to_vec(), value_location).await.unwrap();
226 blob.sync().await.unwrap();
227
228 let cache = Cache::<_, i32>::init(
230 context,
231 Config {
232 partition: "test_partition".into(),
233 codec_config: (),
234 compression: None,
235 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
236 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
237 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
238 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
239 },
240 )
241 .await.expect("Failed to initialize cache");
242
243 let retrieved: Option<i32> = cache
245 .get(index)
246 .await
247 .expect("Failed to get data");
248 assert!(retrieved.is_none());
249 });
250 }
251
252 #[test_traced]
253 fn test_cache_prune() {
254 let executor = deterministic::Runner::default();
256 executor.start(|context| async move {
257 let cfg = Config {
259 partition: "test_partition".into(),
260 codec_config: (),
261 compression: None,
262 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
263 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
264 items_per_blob: NZU64!(1), buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
266 };
267 let mut cache = Cache::init(context.clone(), cfg.clone())
268 .await
269 .expect("Failed to initialize cache");
270
271 let items = vec![(1u64, 1), (2u64, 2), (3u64, 3), (4u64, 4), (5u64, 5)];
273
274 for (index, data) in &items {
275 cache.put(*index, *data).await.expect("Failed to put data");
276 }
277
278 let buffer = context.encode();
280 assert!(buffer.contains("items_tracked 5"));
281
282 cache.prune(3).await.expect("Failed to prune");
284
285 for (index, data) in items {
287 let retrieved = cache.get(index).await.expect("Failed to get data");
288 if index < 3 {
289 assert!(retrieved.is_none());
290 } else {
291 assert_eq!(retrieved.expect("Data not found"), data);
292 }
293 }
294
295 let buffer = context.encode();
297 assert!(buffer.contains("items_tracked 3"));
298
299 cache.prune(2).await.expect("Failed to prune");
301
302 cache.prune(3).await.expect("Failed to prune");
304
305 let result = cache.put(1, 1).await;
307 assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
308 });
309 }
310
311 fn test_cache_restart(num_items: usize) -> String {
312 let executor = deterministic::Runner::default();
314 executor.start(|mut context| async move {
315 let items_per_blob = 256u64;
317 let cfg = Config {
318 partition: "test_partition".into(),
319 codec_config: (),
320 compression: None,
321 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
322 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
323 items_per_blob: NZU64!(items_per_blob),
324 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
325 };
326 let mut cache = Cache::init(context.clone(), cfg.clone())
327 .await
328 .expect("Failed to initialize cache");
329
330 let mut items = BTreeMap::new();
332 while items.len() < num_items {
333 let index = items.len() as u64;
334 let mut data = [0u8; 1024];
335 context.fill(&mut data);
336 items.insert(index, data);
337
338 cache.put(index, data).await.expect("Failed to put data");
339 }
340
341 for (index, data) in &items {
343 let retrieved = cache
344 .get(*index)
345 .await
346 .expect("Failed to get data")
347 .expect("Data not found");
348 assert_eq!(retrieved, *data);
349 }
350
351 let buffer = context.encode();
353 let tracked = format!("items_tracked {num_items:?}");
354 assert!(buffer.contains(&tracked));
355
356 cache.close().await.expect("Failed to close cache");
358
359 let cfg = Config {
361 partition: "test_partition".into(),
362 codec_config: (),
363 compression: None,
364 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
365 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
366 items_per_blob: NZU64!(items_per_blob),
367 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
368 };
369 let mut cache = Cache::<_, [u8; 1024]>::init(context.clone(), cfg.clone())
370 .await
371 .expect("Failed to initialize cache");
372
373 for (index, data) in &items {
375 let retrieved = cache
376 .get(*index)
377 .await
378 .expect("Failed to get data")
379 .expect("Data not found");
380 assert_eq!(&retrieved, data);
381 }
382
383 let min = (items.len() / 2) as u64;
385 cache.prune(min).await.expect("Failed to prune");
386
387 let min = (min / items_per_blob) * items_per_blob;
389 let mut removed = 0;
390 for (index, data) in items {
391 if index >= min {
392 let retrieved = cache
393 .get(index)
394 .await
395 .expect("Failed to get data")
396 .expect("Data not found");
397 assert_eq!(retrieved, data);
398 } else {
399 let retrieved = cache.get(index).await.expect("Failed to get data");
400 assert!(retrieved.is_none());
401 removed += 1;
402 }
403 }
404
405 let buffer = context.encode();
407 let tracked = format!("items_tracked {:?}", num_items - removed);
408 assert!(buffer.contains(&tracked));
409
410 context.auditor().state()
411 })
412 }
413
414 #[test_traced]
415 #[ignore]
416 fn test_cache_many_items_and_restart() {
417 test_cache_restart(100_000);
418 }
419
420 #[test_traced]
421 #[ignore]
422 fn test_determinism() {
423 let state1 = test_cache_restart(5_000);
424 let state2 = test_cache_restart(5_000);
425 assert_eq!(state1, state2);
426 }
427
428 #[test_traced]
429 fn test_cache_next_gap() {
430 let executor = deterministic::Runner::default();
431 executor.start(|context| async move {
432 let cfg = Config {
433 partition: "test_partition".into(),
434 codec_config: (),
435 compression: None,
436 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
437 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
438 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
439 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
440 };
441 let mut cache = Cache::init(context.clone(), cfg.clone())
442 .await
443 .expect("Failed to initialize cache");
444
445 cache.put(1, 1).await.unwrap();
447 cache.put(10, 10).await.unwrap();
448 cache.put(11, 11).await.unwrap();
449 cache.put(14, 14).await.unwrap();
450
451 let (current_end, start_next) = cache.next_gap(0);
453 assert!(current_end.is_none());
454 assert_eq!(start_next, Some(1));
455
456 let (current_end, start_next) = cache.next_gap(1);
457 assert_eq!(current_end, Some(1));
458 assert_eq!(start_next, Some(10));
459
460 let (current_end, start_next) = cache.next_gap(10);
461 assert_eq!(current_end, Some(11));
462 assert_eq!(start_next, Some(14));
463
464 let (current_end, start_next) = cache.next_gap(11);
465 assert_eq!(current_end, Some(11));
466 assert_eq!(start_next, Some(14));
467
468 let (current_end, start_next) = cache.next_gap(12);
469 assert!(current_end.is_none());
470 assert_eq!(start_next, Some(14));
471
472 let (current_end, start_next) = cache.next_gap(14);
473 assert_eq!(current_end, Some(14));
474 assert!(start_next.is_none());
475 });
476 }
477
478 #[test_traced]
479 fn test_cache_missing_items() {
480 let executor = deterministic::Runner::default();
481 executor.start(|context| async move {
482 let cfg = Config {
483 partition: "test_partition".into(),
484 codec_config: (),
485 compression: None,
486 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
487 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
488 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
489 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
490 };
491 let mut cache = Cache::init(context.clone(), cfg.clone())
492 .await
493 .expect("Failed to initialize cache");
494
495 assert_eq!(cache.missing_items(0, 5), Vec::<u64>::new());
497 assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
498
499 cache.put(1, 1).await.unwrap();
501 cache.put(2, 2).await.unwrap();
502 cache.put(5, 5).await.unwrap();
503 cache.put(6, 6).await.unwrap();
504 cache.put(10, 10).await.unwrap();
505
506 assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
508 assert_eq!(cache.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
509 assert_eq!(cache.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
510
511 assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
513 assert_eq!(cache.missing_items(4, 2), vec![4, 7]);
514
515 assert_eq!(cache.missing_items(1, 3), vec![3, 4, 7]);
517 assert_eq!(cache.missing_items(2, 4), vec![3, 4, 7, 8]);
518 assert_eq!(cache.missing_items(5, 2), vec![7, 8]);
519
520 assert_eq!(cache.missing_items(11, 5), Vec::<u64>::new());
522 assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
523
524 cache.put(1000, 1000).await.unwrap();
526
527 let items = cache.missing_items(11, 10);
529 assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
530
531 let items = cache.missing_items(990, 15);
533 assert_eq!(
534 items,
535 vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
536 );
537
538 cache.sync().await.unwrap();
540 assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
541 assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
542
543 cache.put(DEFAULT_ITEMS_PER_BLOB - 1, 99).await.unwrap();
545 cache.put(DEFAULT_ITEMS_PER_BLOB + 1, 101).await.unwrap();
546
547 let items = cache.missing_items(DEFAULT_ITEMS_PER_BLOB - 2, 5);
549 assert_eq!(
550 items,
551 vec![DEFAULT_ITEMS_PER_BLOB - 2, DEFAULT_ITEMS_PER_BLOB]
552 );
553
554 cache.close().await.expect("Failed to close cache");
555 });
556 }
557
558 #[test_traced]
559 fn test_cache_intervals_after_restart() {
560 let executor = deterministic::Runner::default();
561 executor.start(|context| async move {
562 let cfg = Config {
563 partition: "test_partition".into(),
564 codec_config: (),
565 compression: None,
566 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
567 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
568 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
569 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
570 };
571
572 {
574 let mut cache = Cache::init(context.clone(), cfg.clone())
575 .await
576 .expect("Failed to initialize cache");
577
578 cache.put(0, 0).await.expect("Failed to put data");
579 cache.put(100, 100).await.expect("Failed to put data");
580 cache.put(1000, 1000).await.expect("Failed to put data");
581
582 cache.close().await.expect("Failed to close cache");
583 }
584
585 {
587 let cache = Cache::<_, i32>::init(context.clone(), cfg.clone())
588 .await
589 .expect("Failed to initialize cache");
590
591 let (current_end, start_next) = cache.next_gap(0);
593 assert_eq!(current_end, Some(0));
594 assert_eq!(start_next, Some(100));
595
596 let (current_end, start_next) = cache.next_gap(100);
597 assert_eq!(current_end, Some(100));
598 assert_eq!(start_next, Some(1000));
599
600 let items = cache.missing_items(1, 5);
602 assert_eq!(items, vec![1, 2, 3, 4, 5]);
603 }
604 });
605 }
606
607 #[test_traced]
608 fn test_cache_intervals_with_pruning() {
609 let executor = deterministic::Runner::default();
610 executor.start(|context| async move {
611 let cfg = Config {
612 partition: "test_partition".into(),
613 codec_config: (),
614 compression: None,
615 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
616 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
617 items_per_blob: NZU64!(100), buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
619 };
620 let mut cache = Cache::init(context.clone(), cfg.clone())
621 .await
622 .expect("Failed to initialize cache");
623
624 cache.put(50, 50).await.unwrap();
626 cache.put(150, 150).await.unwrap();
627 cache.put(250, 250).await.unwrap();
628 cache.put(350, 350).await.unwrap();
629
630 let (current_end, start_next) = cache.next_gap(0);
632 assert!(current_end.is_none());
633 assert_eq!(start_next, Some(50));
634
635 cache.prune(200).await.expect("Failed to prune");
637
638 assert!(!cache.has(50));
640 assert!(!cache.has(150));
641
642 let (current_end, start_next) = cache.next_gap(200);
644 assert!(current_end.is_none());
645 assert_eq!(start_next, Some(250));
646
647 let items = cache.missing_items(200, 5);
649 assert_eq!(items, vec![200, 201, 202, 203, 204]);
650
651 assert!(cache.has(250));
653 assert!(cache.has(350));
654 assert_eq!(cache.get(250).await.unwrap(), Some(250));
655 assert_eq!(cache.get(350).await.unwrap(), Some(350));
656 });
657 }
658
659 #[test_traced]
660 fn test_cache_sparse_indices() {
661 let executor = deterministic::Runner::default();
662 executor.start(|context| async move {
663 let cfg = Config {
664 partition: "test_partition".into(),
665 codec_config: (),
666 compression: None,
667 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
668 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
669 items_per_blob: NZU64!(100), buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
671 };
672 let mut cache = Cache::init(context.clone(), cfg.clone())
673 .await
674 .expect("Failed to initialize cache");
675
676 let indices = vec![
678 (0u64, 0),
679 (99u64, 99), (100u64, 100), (500u64, 500), ];
683
684 for (index, value) in &indices {
685 cache.put(*index, *value).await.expect("Failed to put data");
686 }
687
688 assert!(!cache.has(1));
690 assert!(!cache.has(50));
691 assert!(!cache.has(101));
692 assert!(!cache.has(499));
693
694 let (current_end, start_next) = cache.next_gap(50);
696 assert!(current_end.is_none());
697 assert_eq!(start_next, Some(99));
698
699 let (current_end, start_next) = cache.next_gap(99);
700 assert_eq!(current_end, Some(100));
701 assert_eq!(start_next, Some(500));
702
703 cache.sync().await.expect("Failed to sync");
705
706 for (index, value) in &indices {
707 let retrieved = cache
708 .get(*index)
709 .await
710 .expect("Failed to get data")
711 .expect("Data not found");
712 assert_eq!(retrieved, *value);
713 }
714 });
715 }
716
717 #[test_traced]
718 fn test_cache_intervals_edge_cases() {
719 let executor = deterministic::Runner::default();
720 executor.start(|context| async move {
721 let cfg = Config {
722 partition: "test_partition".into(),
723 codec_config: (),
724 compression: None,
725 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
726 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
727 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
728 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
729 };
730 let mut cache = Cache::init(context.clone(), cfg.clone())
731 .await
732 .expect("Failed to initialize cache");
733
734 cache.put(42, 42).await.unwrap();
736
737 let (current_end, start_next) = cache.next_gap(42);
738 assert_eq!(current_end, Some(42));
739 assert!(start_next.is_none());
740
741 let (current_end, start_next) = cache.next_gap(41);
742 assert!(current_end.is_none());
743 assert_eq!(start_next, Some(42));
744
745 let (current_end, start_next) = cache.next_gap(43);
746 assert!(current_end.is_none());
747 assert!(start_next.is_none());
748
749 cache.put(43, 43).await.unwrap();
751 cache.put(44, 44).await.unwrap();
752
753 let (current_end, start_next) = cache.next_gap(42);
754 assert_eq!(current_end, Some(44));
755 assert!(start_next.is_none());
756
757 cache.put(u64::MAX - 1, 999).await.unwrap();
759
760 let (current_end, start_next) = cache.next_gap(u64::MAX - 2);
761 assert!(current_end.is_none());
762 assert_eq!(start_next, Some(u64::MAX - 1));
763
764 let (current_end, start_next) = cache.next_gap(u64::MAX - 1);
765 assert_eq!(current_end, Some(u64::MAX - 1));
766 assert!(start_next.is_none());
767 });
768 }
769
770 #[test_traced]
771 fn test_cache_intervals_duplicate_inserts() {
772 let executor = deterministic::Runner::default();
773 executor.start(|context| async move {
774 let cfg = Config {
775 partition: "test_partition".into(),
776 codec_config: (),
777 compression: None,
778 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
779 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
780 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
781 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
782 };
783 let mut cache = Cache::init(context.clone(), cfg.clone())
784 .await
785 .expect("Failed to initialize cache");
786
787 cache.put(10, 10).await.unwrap();
789 assert!(cache.has(10));
790 assert_eq!(cache.get(10).await.unwrap(), Some(10));
791
792 cache.put(10, 20).await.unwrap();
794 assert!(cache.has(10));
795 assert_eq!(cache.get(10).await.unwrap(), Some(10)); let (current_end, start_next) = cache.next_gap(10);
799 assert_eq!(current_end, Some(10));
800 assert!(start_next.is_none());
801
802 cache.put(9, 9).await.unwrap();
804 cache.put(11, 11).await.unwrap();
805
806 let (current_end, start_next) = cache.next_gap(9);
808 assert_eq!(current_end, Some(11));
809 assert!(start_next.is_none());
810 });
811 }
812}