1use commonware_runtime::buffer::PoolRef;
80use std::num::{NonZeroU64, NonZeroUsize};
81use thiserror::Error;
82
83mod storage;
84pub use storage::Cache;
85
86#[derive(Debug, Error)]
88pub enum Error {
89 #[error("journal error: {0}")]
90 Journal(#[from] crate::journal::Error),
91 #[error("record corrupted")]
92 RecordCorrupted,
93 #[error("already pruned to: {0}")]
94 AlreadyPrunedTo(u64),
95 #[error("record too large")]
96 RecordTooLarge,
97}
98
99#[derive(Clone)]
101pub struct Config<C> {
102 pub partition: String,
104
105 pub compression: Option<u8>,
107
108 pub codec_config: C,
110
111 pub items_per_blob: NonZeroU64,
113
114 pub write_buffer: NonZeroUsize,
117
118 pub replay_buffer: NonZeroUsize,
120
121 pub buffer_pool: PoolRef,
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use crate::journal::Error as JournalError;
129 use commonware_macros::{test_group, test_traced};
130 use commonware_runtime::{deterministic, Metrics, Runner};
131 use commonware_utils::{NZUsize, NZU16, NZU64};
132 use rand::Rng;
133 use std::{collections::BTreeMap, num::NonZeroU16};
134
135 const DEFAULT_ITEMS_PER_BLOB: u64 = 65536;
136 const DEFAULT_WRITE_BUFFER: usize = 1024;
137 const DEFAULT_REPLAY_BUFFER: usize = 4096;
138 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
139 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
140
141 #[test_traced]
142 fn test_cache_compression_then_none() {
143 let executor = deterministic::Runner::default();
145 executor.start(|context| async move {
146 let cfg = Config {
148 partition: "test_partition".into(),
149 codec_config: (),
150 compression: Some(3),
151 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
152 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
153 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
154 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
155 };
156 let mut cache = Cache::init(context.clone(), cfg.clone())
157 .await
158 .expect("Failed to initialize cache");
159
160 let index = 1u64;
162 let data = 1;
163 cache.put(index, data).await.expect("Failed to put data");
164
165 cache.sync().await.expect("Failed to sync cache");
167 drop(cache);
168
169 let cfg = Config {
171 partition: "test_partition".into(),
172 codec_config: (),
173 compression: None,
174 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
175 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
176 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
177 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
178 };
179 let result = Cache::<_, i32>::init(context, cfg.clone()).await;
180 assert!(matches!(
181 result,
182 Err(Error::Journal(JournalError::Codec(_)))
183 ));
184 });
185 }
186
187 #[test_traced]
188 fn test_cache_prune() {
189 let executor = deterministic::Runner::default();
191 executor.start(|context| async move {
192 let cfg = Config {
194 partition: "test_partition".into(),
195 codec_config: (),
196 compression: None,
197 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
198 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
199 items_per_blob: NZU64!(1), buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
201 };
202 let mut cache = Cache::init(context.clone(), cfg.clone())
203 .await
204 .expect("Failed to initialize cache");
205
206 let items = vec![(1u64, 1), (2u64, 2), (3u64, 3), (4u64, 4), (5u64, 5)];
208 for (index, data) in &items {
209 cache.put(*index, *data).await.expect("Failed to put data");
210 }
211 assert_eq!(cache.first(), Some(1));
212
213 let buffer = context.encode();
215 assert!(buffer.contains("items_tracked 5"));
216
217 cache.prune(3).await.expect("Failed to prune");
219
220 for (index, data) in items {
222 let retrieved = cache.get(index).await.expect("Failed to get data");
223 if index < 3 {
224 assert!(retrieved.is_none());
225 } else {
226 assert_eq!(retrieved.expect("Data not found"), data);
227 }
228 }
229 assert_eq!(cache.first(), Some(3));
230
231 let buffer = context.encode();
233 assert!(buffer.contains("items_tracked 3"));
234
235 cache.prune(2).await.expect("Failed to prune");
237 assert_eq!(cache.first(), Some(3));
238
239 cache.prune(3).await.expect("Failed to prune");
241 assert_eq!(cache.first(), Some(3));
242
243 let result = cache.put(1, 1).await;
245 assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
246 });
247 }
248
249 fn test_cache_restart(num_items: usize) -> String {
250 let executor = deterministic::Runner::default();
252 executor.start(|mut context| async move {
253 let items_per_blob = 256u64;
255 let cfg = Config {
256 partition: "test_partition".into(),
257 codec_config: (),
258 compression: None,
259 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
260 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
261 items_per_blob: NZU64!(items_per_blob),
262 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
263 };
264 let mut cache = Cache::init(context.clone(), cfg.clone())
265 .await
266 .expect("Failed to initialize cache");
267
268 let mut items = BTreeMap::new();
270 while items.len() < num_items {
271 let index = items.len() as u64;
272 let mut data = [0u8; 1024];
273 context.fill(&mut data);
274 items.insert(index, data);
275
276 cache.put(index, data).await.expect("Failed to put data");
277 }
278
279 for (index, data) in &items {
281 let retrieved = cache
282 .get(*index)
283 .await
284 .expect("Failed to get data")
285 .expect("Data not found");
286 assert_eq!(retrieved, *data);
287 }
288
289 let buffer = context.encode();
291 let tracked = format!("items_tracked {num_items:?}");
292 assert!(buffer.contains(&tracked));
293
294 cache.sync().await.expect("Failed to sync cache");
296 drop(cache);
297
298 let cfg = Config {
300 partition: "test_partition".into(),
301 codec_config: (),
302 compression: None,
303 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
304 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
305 items_per_blob: NZU64!(items_per_blob),
306 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
307 };
308 let mut cache = Cache::<_, [u8; 1024]>::init(context.clone(), cfg.clone())
309 .await
310 .expect("Failed to initialize cache");
311
312 for (index, data) in &items {
314 let retrieved = cache
315 .get(*index)
316 .await
317 .expect("Failed to get data")
318 .expect("Data not found");
319 assert_eq!(&retrieved, data);
320 }
321
322 let min = (items.len() / 2) as u64;
324 cache.prune(min).await.expect("Failed to prune");
325
326 let min = (min / items_per_blob) * items_per_blob;
328 let mut removed = 0;
329 for (index, data) in items {
330 if index >= min {
331 let retrieved = cache
332 .get(index)
333 .await
334 .expect("Failed to get data")
335 .expect("Data not found");
336 assert_eq!(retrieved, data);
337 } else {
338 let retrieved = cache.get(index).await.expect("Failed to get data");
339 assert!(retrieved.is_none());
340 removed += 1;
341 }
342 }
343
344 let buffer = context.encode();
346 let tracked = format!("items_tracked {:?}", num_items - removed);
347 assert!(buffer.contains(&tracked));
348
349 context.auditor().state()
350 })
351 }
352
353 #[test_group("slow")]
354 #[test_traced]
355 fn test_cache_many_items_and_restart() {
356 test_cache_restart(100_000);
357 }
358
359 #[test_group("slow")]
360 #[test_traced]
361 fn test_determinism() {
362 let state1 = test_cache_restart(5_000);
363 let state2 = test_cache_restart(5_000);
364 assert_eq!(state1, state2);
365 }
366
367 #[test_traced]
368 fn test_cache_next_gap() {
369 let executor = deterministic::Runner::default();
370 executor.start(|context| async move {
371 let cfg = Config {
372 partition: "test_partition".into(),
373 codec_config: (),
374 compression: None,
375 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
376 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
377 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
378 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
379 };
380 let mut cache = Cache::init(context.clone(), cfg.clone())
381 .await
382 .expect("Failed to initialize cache");
383
384 assert_eq!(cache.first(), None);
386
387 cache.put(1, 1).await.unwrap();
389 cache.put(10, 10).await.unwrap();
390 cache.put(11, 11).await.unwrap();
391 cache.put(14, 14).await.unwrap();
392
393 let (current_end, start_next) = cache.next_gap(0);
395 assert!(current_end.is_none());
396 assert_eq!(start_next, Some(1));
397 assert_eq!(cache.first(), Some(1));
398
399 let (current_end, start_next) = cache.next_gap(1);
400 assert_eq!(current_end, Some(1));
401 assert_eq!(start_next, Some(10));
402
403 let (current_end, start_next) = cache.next_gap(10);
404 assert_eq!(current_end, Some(11));
405 assert_eq!(start_next, Some(14));
406
407 let (current_end, start_next) = cache.next_gap(11);
408 assert_eq!(current_end, Some(11));
409 assert_eq!(start_next, Some(14));
410
411 let (current_end, start_next) = cache.next_gap(12);
412 assert!(current_end.is_none());
413 assert_eq!(start_next, Some(14));
414
415 let (current_end, start_next) = cache.next_gap(14);
416 assert_eq!(current_end, Some(14));
417 assert!(start_next.is_none());
418 });
419 }
420
421 #[test_traced]
422 fn test_cache_missing_items() {
423 let executor = deterministic::Runner::default();
424 executor.start(|context| async move {
425 let cfg = Config {
426 partition: "test_partition".into(),
427 codec_config: (),
428 compression: None,
429 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
430 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
431 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
432 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
433 };
434 let mut cache = Cache::init(context.clone(), cfg.clone())
435 .await
436 .expect("Failed to initialize cache");
437
438 assert_eq!(cache.first(), None);
440 assert_eq!(cache.missing_items(0, 5), Vec::<u64>::new());
441 assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
442
443 cache.put(1, 1).await.unwrap();
445 cache.put(2, 2).await.unwrap();
446 cache.put(5, 5).await.unwrap();
447 cache.put(6, 6).await.unwrap();
448 cache.put(10, 10).await.unwrap();
449
450 assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
452 assert_eq!(cache.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
453 assert_eq!(cache.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
454
455 assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
457 assert_eq!(cache.missing_items(4, 2), vec![4, 7]);
458
459 assert_eq!(cache.missing_items(1, 3), vec![3, 4, 7]);
461 assert_eq!(cache.missing_items(2, 4), vec![3, 4, 7, 8]);
462 assert_eq!(cache.missing_items(5, 2), vec![7, 8]);
463
464 assert_eq!(cache.missing_items(11, 5), Vec::<u64>::new());
466 assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
467
468 cache.put(1000, 1000).await.unwrap();
470
471 let items = cache.missing_items(11, 10);
473 assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
474
475 let items = cache.missing_items(990, 15);
477 assert_eq!(
478 items,
479 vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
480 );
481
482 cache.sync().await.unwrap();
484 assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
485 assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
486
487 cache.put(DEFAULT_ITEMS_PER_BLOB - 1, 99).await.unwrap();
489 cache.put(DEFAULT_ITEMS_PER_BLOB + 1, 101).await.unwrap();
490
491 let items = cache.missing_items(DEFAULT_ITEMS_PER_BLOB - 2, 5);
493 assert_eq!(
494 items,
495 vec![DEFAULT_ITEMS_PER_BLOB - 2, DEFAULT_ITEMS_PER_BLOB]
496 );
497 });
498 }
499
500 #[test_traced]
501 fn test_cache_intervals_after_restart() {
502 let executor = deterministic::Runner::default();
503 executor.start(|context| async move {
504 let cfg = Config {
505 partition: "test_partition".into(),
506 codec_config: (),
507 compression: None,
508 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
509 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
510 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
511 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
512 };
513
514 {
516 let mut cache = Cache::init(context.clone(), cfg.clone())
517 .await
518 .expect("Failed to initialize cache");
519
520 cache.put(0, 0).await.expect("Failed to put data");
521 cache.put(100, 100).await.expect("Failed to put data");
522 cache.put(1000, 1000).await.expect("Failed to put data");
523
524 cache.sync().await.expect("Failed to sync cache");
525 }
526
527 {
529 let cache = Cache::<_, i32>::init(context.clone(), cfg.clone())
530 .await
531 .expect("Failed to initialize cache");
532
533 let (current_end, start_next) = cache.next_gap(0);
535 assert_eq!(current_end, Some(0));
536 assert_eq!(start_next, Some(100));
537
538 let (current_end, start_next) = cache.next_gap(100);
539 assert_eq!(current_end, Some(100));
540 assert_eq!(start_next, Some(1000));
541
542 let items = cache.missing_items(1, 5);
544 assert_eq!(items, vec![1, 2, 3, 4, 5]);
545 }
546 });
547 }
548
549 #[test_traced]
550 fn test_cache_intervals_with_pruning() {
551 let executor = deterministic::Runner::default();
552 executor.start(|context| async move {
553 let cfg = Config {
554 partition: "test_partition".into(),
555 codec_config: (),
556 compression: None,
557 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
558 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
559 items_per_blob: NZU64!(100), buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
561 };
562 let mut cache = Cache::init(context.clone(), cfg.clone())
563 .await
564 .expect("Failed to initialize cache");
565
566 cache.put(50, 50).await.unwrap();
568 cache.put(150, 150).await.unwrap();
569 cache.put(250, 250).await.unwrap();
570 cache.put(350, 350).await.unwrap();
571
572 let (current_end, start_next) = cache.next_gap(0);
574 assert!(current_end.is_none());
575 assert_eq!(start_next, Some(50));
576
577 cache.prune(200).await.expect("Failed to prune");
579
580 assert!(!cache.has(50));
582 assert!(!cache.has(150));
583
584 let (current_end, start_next) = cache.next_gap(200);
586 assert!(current_end.is_none());
587 assert_eq!(start_next, Some(250));
588
589 let items = cache.missing_items(200, 5);
591 assert_eq!(items, vec![200, 201, 202, 203, 204]);
592
593 assert!(cache.has(250));
595 assert!(cache.has(350));
596 assert_eq!(cache.get(250).await.unwrap(), Some(250));
597 assert_eq!(cache.get(350).await.unwrap(), Some(350));
598 });
599 }
600
601 #[test_traced]
602 fn test_cache_sparse_indices() {
603 let executor = deterministic::Runner::default();
604 executor.start(|context| async move {
605 let cfg = Config {
606 partition: "test_partition".into(),
607 codec_config: (),
608 compression: None,
609 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
610 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
611 items_per_blob: NZU64!(100), buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
613 };
614 let mut cache = Cache::init(context.clone(), cfg.clone())
615 .await
616 .expect("Failed to initialize cache");
617
618 let indices = vec![
620 (0u64, 0),
621 (99u64, 99), (100u64, 100), (500u64, 500), ];
625
626 for (index, value) in &indices {
627 cache.put(*index, *value).await.expect("Failed to put data");
628 }
629
630 assert!(!cache.has(1));
632 assert!(!cache.has(50));
633 assert!(!cache.has(101));
634 assert!(!cache.has(499));
635
636 let (current_end, start_next) = cache.next_gap(50);
638 assert!(current_end.is_none());
639 assert_eq!(start_next, Some(99));
640
641 let (current_end, start_next) = cache.next_gap(99);
642 assert_eq!(current_end, Some(100));
643 assert_eq!(start_next, Some(500));
644
645 cache.sync().await.expect("Failed to sync");
647
648 for (index, value) in &indices {
649 let retrieved = cache
650 .get(*index)
651 .await
652 .expect("Failed to get data")
653 .expect("Data not found");
654 assert_eq!(retrieved, *value);
655 }
656 });
657 }
658
659 #[test_traced]
660 fn test_cache_intervals_edge_cases() {
661 let executor = deterministic::Runner::default();
662 executor.start(|context| async move {
663 let cfg = Config {
664 partition: "test_partition".into(),
665 codec_config: (),
666 compression: None,
667 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
668 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
669 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
670 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
671 };
672 let mut cache = Cache::init(context.clone(), cfg.clone())
673 .await
674 .expect("Failed to initialize cache");
675
676 cache.put(42, 42).await.unwrap();
678
679 let (current_end, start_next) = cache.next_gap(42);
680 assert_eq!(current_end, Some(42));
681 assert!(start_next.is_none());
682
683 let (current_end, start_next) = cache.next_gap(41);
684 assert!(current_end.is_none());
685 assert_eq!(start_next, Some(42));
686
687 let (current_end, start_next) = cache.next_gap(43);
688 assert!(current_end.is_none());
689 assert!(start_next.is_none());
690
691 cache.put(43, 43).await.unwrap();
693 cache.put(44, 44).await.unwrap();
694
695 let (current_end, start_next) = cache.next_gap(42);
696 assert_eq!(current_end, Some(44));
697 assert!(start_next.is_none());
698
699 cache.put(u64::MAX - 1, 999).await.unwrap();
701
702 let (current_end, start_next) = cache.next_gap(u64::MAX - 2);
703 assert!(current_end.is_none());
704 assert_eq!(start_next, Some(u64::MAX - 1));
705
706 let (current_end, start_next) = cache.next_gap(u64::MAX - 1);
707 assert_eq!(current_end, Some(u64::MAX - 1));
708 assert!(start_next.is_none());
709 });
710 }
711
712 #[test_traced]
713 fn test_cache_intervals_duplicate_inserts() {
714 let executor = deterministic::Runner::default();
715 executor.start(|context| async move {
716 let cfg = Config {
717 partition: "test_partition".into(),
718 codec_config: (),
719 compression: None,
720 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
721 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
722 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
723 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
724 };
725 let mut cache = Cache::init(context.clone(), cfg.clone())
726 .await
727 .expect("Failed to initialize cache");
728
729 cache.put(10, 10).await.unwrap();
731 assert!(cache.has(10));
732 assert_eq!(cache.get(10).await.unwrap(), Some(10));
733
734 cache.put(10, 20).await.unwrap();
736 assert!(cache.has(10));
737 assert_eq!(cache.get(10).await.unwrap(), Some(10)); let (current_end, start_next) = cache.next_gap(10);
741 assert_eq!(current_end, Some(10));
742 assert!(start_next.is_none());
743
744 cache.put(9, 9).await.unwrap();
746 cache.put(11, 11).await.unwrap();
747
748 let (current_end, start_next) = cache.next_gap(9);
750 assert_eq!(current_end, Some(11));
751 assert!(start_next.is_none());
752 });
753 }
754}