1use commonware_runtime::buffer::paged::CacheRef;
80use std::num::{NonZeroU64, NonZeroUsize};
81use thiserror::Error;
82
83mod storage;
84pub use storage::Cache;
85
86#[derive(Debug, Error)]
88pub enum Error {
89 #[error("journal error: {0}")]
90 Journal(#[from] crate::journal::Error),
91 #[error("record corrupted")]
92 RecordCorrupted,
93 #[error("already pruned to: {0}")]
94 AlreadyPrunedTo(u64),
95 #[error("record too large")]
96 RecordTooLarge,
97}
98
99#[derive(Clone)]
101pub struct Config<C> {
102 pub partition: String,
104
105 pub compression: Option<u8>,
107
108 pub codec_config: C,
110
111 pub items_per_blob: NonZeroU64,
113
114 pub write_buffer: NonZeroUsize,
117
118 pub replay_buffer: NonZeroUsize,
120
121 pub page_cache: CacheRef,
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use crate::journal::Error as JournalError;
129 use commonware_macros::{test_group, test_traced};
130 use commonware_runtime::{
131 deterministic, telemetry::metrics::has_metric_value, Metrics as _, Runner, Supervisor as _,
132 };
133 use commonware_utils::{NZUsize, NZU16, NZU64};
134 use rand::Rng;
135 use std::{collections::BTreeMap, num::NonZeroU16};
136
137 const DEFAULT_ITEMS_PER_BLOB: u64 = 65536;
138 const DEFAULT_WRITE_BUFFER: usize = 1024;
139 const DEFAULT_REPLAY_BUFFER: usize = 4096;
140 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
141 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
142
143 #[test_traced]
144 fn test_cache_compression_then_none() {
145 let executor = deterministic::Runner::default();
147 executor.start(|context| async move {
148 let cfg = Config {
150 partition: "test-partition".into(),
151 codec_config: (),
152 compression: Some(3),
153 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
154 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
155 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
156 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
157 };
158 let mut cache = Cache::init(context.child("first"), cfg.clone())
159 .await
160 .expect("Failed to initialize cache");
161
162 let index = 1u64;
164 let data = 1;
165 cache.put(index, data).await.expect("Failed to put data");
166
167 cache.sync().await.expect("Failed to sync cache");
169 drop(cache);
170
171 let cfg = Config {
173 partition: "test-partition".into(),
174 codec_config: (),
175 compression: None,
176 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
177 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
178 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
179 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
180 };
181 let result = Cache::<_, i32>::init(context.child("second"), cfg.clone()).await;
182 assert!(matches!(
183 result,
184 Err(Error::Journal(JournalError::Codec(_)))
185 ));
186 });
187 }
188
189 #[test_traced]
190 fn test_cache_prune() {
191 let executor = deterministic::Runner::default();
193 executor.start(|context| async move {
194 let cfg = Config {
196 partition: "test-partition".into(),
197 codec_config: (),
198 compression: None,
199 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
200 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
201 items_per_blob: NZU64!(1), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
203 };
204 let mut cache = Cache::init(context.child("storage"), cfg.clone())
205 .await
206 .expect("Failed to initialize cache");
207
208 let items = vec![(1u64, 1), (2u64, 2), (3u64, 3), (4u64, 4), (5u64, 5)];
210 for (index, data) in &items {
211 cache.put(*index, *data).await.expect("Failed to put data");
212 }
213 assert_eq!(cache.first(), Some(1));
214
215 let buffer = context.encode();
217 assert!(has_metric_value(&buffer, "items_tracked", 5));
218
219 cache.prune(3).await.expect("Failed to prune");
221
222 for (index, data) in items {
224 let retrieved = cache.get(index).await.expect("Failed to get data");
225 if index < 3 {
226 assert!(retrieved.is_none());
227 } else {
228 assert_eq!(retrieved.expect("Data not found"), data);
229 }
230 }
231 assert_eq!(cache.first(), Some(3));
232
233 let buffer = context.encode();
235 assert!(has_metric_value(&buffer, "items_tracked", 3));
236
237 cache.prune(2).await.expect("Failed to prune");
239 assert_eq!(cache.first(), Some(3));
240
241 cache.prune(3).await.expect("Failed to prune");
243 assert_eq!(cache.first(), Some(3));
244
245 let result = cache.put(1, 1).await;
247 assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
248 });
249 }
250
251 fn test_cache_restart(num_items: usize) -> String {
252 let executor = deterministic::Runner::default();
254 executor.start(|mut context| async move {
255 let items_per_blob = 256u64;
257 let cfg = Config {
258 partition: "test-partition".into(),
259 codec_config: (),
260 compression: None,
261 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
262 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
263 items_per_blob: NZU64!(items_per_blob),
264 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
265 };
266 let mut cache = Cache::init(
267 context.child("init").with_attribute("index", 1),
268 cfg.clone(),
269 )
270 .await
271 .expect("Failed to initialize cache");
272
273 let mut items = BTreeMap::new();
275 while items.len() < num_items {
276 let index = items.len() as u64;
277 let mut data = [0u8; 1024];
278 context.fill(&mut data);
279 items.insert(index, data);
280
281 cache.put(index, data).await.expect("Failed to put data");
282 }
283
284 for (index, data) in &items {
286 let retrieved = cache
287 .get(*index)
288 .await
289 .expect("Failed to get data")
290 .expect("Data not found");
291 assert_eq!(retrieved, *data);
292 }
293
294 let buffer = context.encode();
296 assert!(has_metric_value(&buffer, "items_tracked", num_items));
297
298 cache.sync().await.expect("Failed to sync cache");
300 drop(cache);
301
302 let cfg = Config {
304 partition: "test-partition".into(),
305 codec_config: (),
306 compression: None,
307 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
308 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
309 items_per_blob: NZU64!(items_per_blob),
310 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
311 };
312 let mut cache = Cache::<_, [u8; 1024]>::init(
313 context.child("init").with_attribute("index", 2),
314 cfg.clone(),
315 )
316 .await
317 .expect("Failed to initialize cache");
318
319 for (index, data) in &items {
321 let retrieved = cache
322 .get(*index)
323 .await
324 .expect("Failed to get data")
325 .expect("Data not found");
326 assert_eq!(&retrieved, data);
327 }
328
329 let min = (items.len() / 2) as u64;
331 cache.prune(min).await.expect("Failed to prune");
332
333 let min = (min / items_per_blob) * items_per_blob;
335 let mut removed = 0;
336 for (index, data) in items {
337 if index >= min {
338 let retrieved = cache
339 .get(index)
340 .await
341 .expect("Failed to get data")
342 .expect("Data not found");
343 assert_eq!(retrieved, data);
344 } else {
345 let retrieved = cache.get(index).await.expect("Failed to get data");
346 assert!(retrieved.is_none());
347 removed += 1;
348 }
349 }
350
351 let buffer = context.encode();
353 assert!(has_metric_value(
354 &buffer,
355 "items_tracked",
356 num_items - removed
357 ));
358
359 context.auditor().state()
360 })
361 }
362
363 #[test_group("slow")]
364 #[test_traced]
365 fn test_cache_many_items_and_restart() {
366 test_cache_restart(100_000);
367 }
368
369 #[test_group("slow")]
370 #[test_traced]
371 fn test_determinism() {
372 let state1 = test_cache_restart(5_000);
373 let state2 = test_cache_restart(5_000);
374 assert_eq!(state1, state2);
375 }
376
377 #[test_traced]
378 fn test_cache_next_gap() {
379 let executor = deterministic::Runner::default();
380 executor.start(|context| async move {
381 let cfg = Config {
382 partition: "test-partition".into(),
383 codec_config: (),
384 compression: None,
385 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
386 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
387 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
388 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
389 };
390 let mut cache = Cache::init(context.child("storage"), cfg.clone())
391 .await
392 .expect("Failed to initialize cache");
393
394 assert_eq!(cache.first(), None);
396
397 cache.put(1, 1).await.unwrap();
399 cache.put(10, 10).await.unwrap();
400 cache.put(11, 11).await.unwrap();
401 cache.put(14, 14).await.unwrap();
402
403 let (current_end, start_next) = cache.next_gap(0);
405 assert!(current_end.is_none());
406 assert_eq!(start_next, Some(1));
407 assert_eq!(cache.first(), Some(1));
408
409 let (current_end, start_next) = cache.next_gap(1);
410 assert_eq!(current_end, Some(1));
411 assert_eq!(start_next, Some(10));
412
413 let (current_end, start_next) = cache.next_gap(10);
414 assert_eq!(current_end, Some(11));
415 assert_eq!(start_next, Some(14));
416
417 let (current_end, start_next) = cache.next_gap(11);
418 assert_eq!(current_end, Some(11));
419 assert_eq!(start_next, Some(14));
420
421 let (current_end, start_next) = cache.next_gap(12);
422 assert!(current_end.is_none());
423 assert_eq!(start_next, Some(14));
424
425 let (current_end, start_next) = cache.next_gap(14);
426 assert_eq!(current_end, Some(14));
427 assert!(start_next.is_none());
428 });
429 }
430
431 #[test_traced]
432 fn test_cache_missing_items() {
433 let executor = deterministic::Runner::default();
434 executor.start(|context| async move {
435 let cfg = Config {
436 partition: "test-partition".into(),
437 codec_config: (),
438 compression: None,
439 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
440 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
441 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
442 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
443 };
444 let mut cache = Cache::init(context.child("storage"), cfg.clone())
445 .await
446 .expect("Failed to initialize cache");
447
448 assert_eq!(cache.first(), None);
450 assert_eq!(cache.missing_items(0, 5), Vec::<u64>::new());
451 assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
452
453 cache.put(1, 1).await.unwrap();
455 cache.put(2, 2).await.unwrap();
456 cache.put(5, 5).await.unwrap();
457 cache.put(6, 6).await.unwrap();
458 cache.put(10, 10).await.unwrap();
459
460 assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
462 assert_eq!(cache.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
463 assert_eq!(cache.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
464
465 assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
467 assert_eq!(cache.missing_items(4, 2), vec![4, 7]);
468
469 assert_eq!(cache.missing_items(1, 3), vec![3, 4, 7]);
471 assert_eq!(cache.missing_items(2, 4), vec![3, 4, 7, 8]);
472 assert_eq!(cache.missing_items(5, 2), vec![7, 8]);
473
474 assert_eq!(cache.missing_items(11, 5), Vec::<u64>::new());
476 assert_eq!(cache.missing_items(100, 10), Vec::<u64>::new());
477
478 cache.put(1000, 1000).await.unwrap();
480
481 let items = cache.missing_items(11, 10);
483 assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
484
485 let items = cache.missing_items(990, 15);
487 assert_eq!(
488 items,
489 vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
490 );
491
492 cache.sync().await.unwrap();
494 assert_eq!(cache.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
495 assert_eq!(cache.missing_items(3, 3), vec![3, 4, 7]);
496
497 cache.put(DEFAULT_ITEMS_PER_BLOB - 1, 99).await.unwrap();
499 cache.put(DEFAULT_ITEMS_PER_BLOB + 1, 101).await.unwrap();
500
501 let items = cache.missing_items(DEFAULT_ITEMS_PER_BLOB - 2, 5);
503 assert_eq!(
504 items,
505 vec![DEFAULT_ITEMS_PER_BLOB - 2, DEFAULT_ITEMS_PER_BLOB]
506 );
507 });
508 }
509
510 #[test_traced]
511 fn test_cache_intervals_after_restart() {
512 let executor = deterministic::Runner::default();
513 executor.start(|context| async move {
514 let cfg = Config {
515 partition: "test-partition".into(),
516 codec_config: (),
517 compression: None,
518 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
519 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
520 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
521 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
522 };
523
524 {
526 let mut cache = Cache::init(context.child("first"), cfg.clone())
527 .await
528 .expect("Failed to initialize cache");
529
530 cache.put(0, 0).await.expect("Failed to put data");
531 cache.put(100, 100).await.expect("Failed to put data");
532 cache.put(1000, 1000).await.expect("Failed to put data");
533
534 cache.sync().await.expect("Failed to sync cache");
535 }
536
537 {
539 let cache = Cache::<_, i32>::init(context.child("second"), cfg.clone())
540 .await
541 .expect("Failed to initialize cache");
542
543 let (current_end, start_next) = cache.next_gap(0);
545 assert_eq!(current_end, Some(0));
546 assert_eq!(start_next, Some(100));
547
548 let (current_end, start_next) = cache.next_gap(100);
549 assert_eq!(current_end, Some(100));
550 assert_eq!(start_next, Some(1000));
551
552 let items = cache.missing_items(1, 5);
554 assert_eq!(items, vec![1, 2, 3, 4, 5]);
555 }
556 });
557 }
558
559 #[test_traced]
560 fn test_cache_intervals_with_pruning() {
561 let executor = deterministic::Runner::default();
562 executor.start(|context| async move {
563 let cfg = Config {
564 partition: "test-partition".into(),
565 codec_config: (),
566 compression: None,
567 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
568 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
569 items_per_blob: NZU64!(100), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
571 };
572 let mut cache = Cache::init(context.child("storage"), cfg.clone())
573 .await
574 .expect("Failed to initialize cache");
575
576 cache.put(50, 50).await.unwrap();
578 cache.put(150, 150).await.unwrap();
579 cache.put(250, 250).await.unwrap();
580 cache.put(350, 350).await.unwrap();
581
582 let (current_end, start_next) = cache.next_gap(0);
584 assert!(current_end.is_none());
585 assert_eq!(start_next, Some(50));
586
587 cache.prune(200).await.expect("Failed to prune");
589
590 assert!(!cache.has(50));
592 assert!(!cache.has(150));
593
594 let (current_end, start_next) = cache.next_gap(200);
596 assert!(current_end.is_none());
597 assert_eq!(start_next, Some(250));
598
599 let items = cache.missing_items(200, 5);
601 assert_eq!(items, vec![200, 201, 202, 203, 204]);
602
603 assert!(cache.has(250));
605 assert!(cache.has(350));
606 assert_eq!(cache.get(250).await.unwrap(), Some(250));
607 assert_eq!(cache.get(350).await.unwrap(), Some(350));
608 });
609 }
610
611 #[test_traced]
612 fn test_cache_sparse_indices() {
613 let executor = deterministic::Runner::default();
614 executor.start(|context| async move {
615 let cfg = Config {
616 partition: "test-partition".into(),
617 codec_config: (),
618 compression: None,
619 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
620 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
621 items_per_blob: NZU64!(100), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
623 };
624 let mut cache = Cache::init(context.child("storage"), cfg.clone())
625 .await
626 .expect("Failed to initialize cache");
627
628 let indices = vec![
630 (0u64, 0),
631 (99u64, 99), (100u64, 100), (500u64, 500), ];
635
636 for (index, value) in &indices {
637 cache.put(*index, *value).await.expect("Failed to put data");
638 }
639
640 assert!(!cache.has(1));
642 assert!(!cache.has(50));
643 assert!(!cache.has(101));
644 assert!(!cache.has(499));
645
646 let (current_end, start_next) = cache.next_gap(50);
648 assert!(current_end.is_none());
649 assert_eq!(start_next, Some(99));
650
651 let (current_end, start_next) = cache.next_gap(99);
652 assert_eq!(current_end, Some(100));
653 assert_eq!(start_next, Some(500));
654
655 cache.sync().await.expect("Failed to sync");
657
658 for (index, value) in &indices {
659 let retrieved = cache
660 .get(*index)
661 .await
662 .expect("Failed to get data")
663 .expect("Data not found");
664 assert_eq!(retrieved, *value);
665 }
666 });
667 }
668
669 #[test_traced]
670 fn test_cache_intervals_edge_cases() {
671 let executor = deterministic::Runner::default();
672 executor.start(|context| async move {
673 let cfg = Config {
674 partition: "test-partition".into(),
675 codec_config: (),
676 compression: None,
677 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
678 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
679 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
680 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
681 };
682 let mut cache = Cache::init(context.child("storage"), cfg.clone())
683 .await
684 .expect("Failed to initialize cache");
685
686 cache.put(42, 42).await.unwrap();
688
689 let (current_end, start_next) = cache.next_gap(42);
690 assert_eq!(current_end, Some(42));
691 assert!(start_next.is_none());
692
693 let (current_end, start_next) = cache.next_gap(41);
694 assert!(current_end.is_none());
695 assert_eq!(start_next, Some(42));
696
697 let (current_end, start_next) = cache.next_gap(43);
698 assert!(current_end.is_none());
699 assert!(start_next.is_none());
700
701 cache.put(43, 43).await.unwrap();
703 cache.put(44, 44).await.unwrap();
704
705 let (current_end, start_next) = cache.next_gap(42);
706 assert_eq!(current_end, Some(44));
707 assert!(start_next.is_none());
708
709 cache.put(u64::MAX - 1, 999).await.unwrap();
711
712 let (current_end, start_next) = cache.next_gap(u64::MAX - 2);
713 assert!(current_end.is_none());
714 assert_eq!(start_next, Some(u64::MAX - 1));
715
716 let (current_end, start_next) = cache.next_gap(u64::MAX - 1);
717 assert_eq!(current_end, Some(u64::MAX - 1));
718 assert!(start_next.is_none());
719 });
720 }
721
722 #[test_traced]
723 fn test_cache_intervals_duplicate_inserts() {
724 let executor = deterministic::Runner::default();
725 executor.start(|context| async move {
726 let cfg = Config {
727 partition: "test-partition".into(),
728 codec_config: (),
729 compression: None,
730 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
731 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
732 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
733 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
734 };
735 let mut cache = Cache::init(context.child("storage"), cfg.clone())
736 .await
737 .expect("Failed to initialize cache");
738
739 cache.put(10, 10).await.unwrap();
741 assert!(cache.has(10));
742 assert_eq!(cache.get(10).await.unwrap(), Some(10));
743
744 cache.put(10, 20).await.unwrap();
746 assert!(cache.has(10));
747 assert_eq!(cache.get(10).await.unwrap(), Some(10)); let (current_end, start_next) = cache.next_gap(10);
751 assert_eq!(current_end, Some(10));
752 assert!(start_next.is_none());
753
754 cache.put(9, 9).await.unwrap();
756 cache.put(11, 11).await.unwrap();
757
758 let (current_end, start_next) = cache.next_gap(9);
760 assert_eq!(current_end, Some(11));
761 assert!(start_next.is_none());
762 });
763 }
764}