1use commonware_codec::Codec;
7use commonware_utils::Array;
8use std::future::Future;
9use thiserror::Error;
10
11pub mod immutable;
12pub mod prunable;
13
14#[cfg(all(test, feature = "arbitrary"))]
15mod conformance;
16
17pub enum Identifier<'a, K: Array> {
19 Index(u64),
20 Key(&'a K),
21}
22
23#[derive(Debug, Error)]
25pub enum Error {
26 #[error("journal error: {0}")]
27 Journal(#[from] crate::journal::Error),
28 #[error("ordinal error: {0}")]
29 Ordinal(#[from] crate::ordinal::Error),
30 #[error("metadata error: {0}")]
31 Metadata(#[from] crate::metadata::Error),
32 #[error("freezer error: {0}")]
33 Freezer(#[from] crate::freezer::Error),
34 #[error("record corrupted")]
35 RecordCorrupted,
36 #[error("already pruned to: {0}")]
37 AlreadyPrunedTo(u64),
38 #[error("record too large")]
39 RecordTooLarge,
40}
41
42pub trait Archive: Send {
44 type Key: Array;
46
47 type Value: Codec + Send;
49
50 fn put(
55 &mut self,
56 index: u64,
57 key: Self::Key,
58 value: Self::Value,
59 ) -> impl Future<Output = Result<(), Error>> + Send;
60
61 fn put_sync(
63 &mut self,
64 index: u64,
65 key: Self::Key,
66 value: Self::Value,
67 ) -> impl Future<Output = Result<(), Error>> + Send {
68 async move {
69 self.put(index, key, value).await?;
70 self.sync().await
71 }
72 }
73
74 fn get<'a>(
76 &'a self,
77 identifier: Identifier<'a, Self::Key>,
78 ) -> impl Future<Output = Result<Option<Self::Value>, Error>> + Send + use<'a, Self>;
79
80 fn has<'a>(
82 &'a self,
83 identifier: Identifier<'a, Self::Key>,
84 ) -> impl Future<Output = Result<bool, Error>> + Send + use<'a, Self>;
85
86 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>);
91
92 fn missing_items(&self, index: u64, max: usize) -> Vec<u64>;
97
98 fn ranges(&self) -> impl Iterator<Item = (u64, u64)>;
100
101 fn first_index(&self) -> Option<u64>;
103
104 fn last_index(&self) -> Option<u64>;
106
107 fn sync(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
109
110 fn destroy(self) -> impl Future<Output = Result<(), Error>> + Send;
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117 use crate::{kv::tests::test_key, translator::TwoCap};
118 use commonware_codec::DecodeExt;
119 use commonware_macros::{test_group, test_traced};
120 use commonware_runtime::{
121 buffer::paged::CacheRef,
122 deterministic::{self, Context},
123 Metrics, Runner,
124 };
125 use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
126 use rand::Rng;
127 use std::{
128 collections::BTreeMap,
129 num::{NonZeroU16, NonZeroUsize},
130 };
131
132 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
133 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
134
135 async fn create_prunable(
136 context: Context,
137 compression: Option<u8>,
138 ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
139 let cfg = prunable::Config {
140 translator: TwoCap,
141 key_partition: "test_key".into(),
142 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
143 value_partition: "test_value".into(),
144 compression,
145 codec_config: (),
146 items_per_section: NZU64!(1024),
147 key_write_buffer: NZUsize!(1024),
148 value_write_buffer: NZUsize!(1024),
149 replay_buffer: NZUsize!(1024),
150 };
151 prunable::Archive::init(context, cfg).await.unwrap()
152 }
153
154 async fn create_immutable(
155 context: Context,
156 compression: Option<u8>,
157 ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
158 let cfg = immutable::Config {
159 metadata_partition: "test_metadata".into(),
160 freezer_table_partition: "test_table".into(),
161 freezer_table_initial_size: 64,
162 freezer_table_resize_frequency: 2,
163 freezer_table_resize_chunk_size: 32,
164 freezer_key_partition: "test_key".into(),
165 freezer_key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
166 freezer_value_partition: "test_value".into(),
167 freezer_value_target_size: 1024 * 1024,
168 freezer_value_compression: compression,
169 ordinal_partition: "test_ordinal".into(),
170 items_per_section: NZU64!(1024),
171 freezer_key_write_buffer: NZUsize!(1024 * 1024),
172 freezer_value_write_buffer: NZUsize!(1024 * 1024),
173 ordinal_write_buffer: NZUsize!(1024 * 1024),
174 replay_buffer: NZUsize!(1024 * 1024),
175 codec_config: (),
176 };
177 immutable::Archive::init(context, cfg).await.unwrap()
178 }
179
180 async fn test_put_get_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
181 let index = 1u64;
182 let key = test_key("testkey");
183 let data = 1;
184
185 let has = archive
187 .has(Identifier::Index(index))
188 .await
189 .expect("Failed to check key");
190 assert!(!has);
191 let has = archive
192 .has(Identifier::Key(&key))
193 .await
194 .expect("Failed to check key");
195 assert!(!has);
196
197 archive
199 .put(index, key.clone(), data)
200 .await
201 .expect("Failed to put data");
202
203 let has = archive
205 .has(Identifier::Index(index))
206 .await
207 .expect("Failed to check key");
208 assert!(has);
209 let has = archive
210 .has(Identifier::Key(&key))
211 .await
212 .expect("Failed to check key");
213 assert!(has);
214
215 let retrieved = archive
217 .get(Identifier::Key(&key))
218 .await
219 .expect("Failed to get data");
220 assert_eq!(retrieved, Some(data));
221
222 let retrieved = archive
224 .get(Identifier::Index(index))
225 .await
226 .expect("Failed to get data");
227 assert_eq!(retrieved, Some(data));
228
229 archive.sync().await.expect("Failed to sync data");
231 }
232
233 #[test_traced]
234 fn test_put_get_prunable_no_compression() {
235 let executor = deterministic::Runner::default();
236 executor.start(|context| async move {
237 let archive = create_prunable(context, None).await;
238 test_put_get_impl(archive).await;
239 });
240 }
241
242 #[test_traced]
243 fn test_put_get_prunable_compression() {
244 let executor = deterministic::Runner::default();
245 executor.start(|context| async move {
246 let archive = create_prunable(context, Some(3)).await;
247 test_put_get_impl(archive).await;
248 });
249 }
250
251 #[test_traced]
252 fn test_put_get_immutable_no_compression() {
253 let executor = deterministic::Runner::default();
254 executor.start(|context| async move {
255 let archive = create_immutable(context, None).await;
256 test_put_get_impl(archive).await;
257 });
258 }
259
260 #[test_traced]
261 fn test_put_get_immutable_compression() {
262 let executor = deterministic::Runner::default();
263 executor.start(|context| async move {
264 let archive = create_immutable(context, Some(3)).await;
265 test_put_get_impl(archive).await;
266 });
267 }
268
269 async fn test_duplicate_key_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
270 let index = 1u64;
271 let key = test_key("duplicate");
272 let data1 = 1;
273 let data2 = 2;
274
275 archive
277 .put(index, key.clone(), data1)
278 .await
279 .expect("Failed to put data");
280
281 archive
283 .put(index, key.clone(), data2)
284 .await
285 .expect("Duplicate put should not fail");
286
287 let retrieved = archive
289 .get(Identifier::Index(index))
290 .await
291 .expect("Failed to get data")
292 .expect("Data not found");
293 assert_eq!(retrieved, data1);
294
295 let retrieved = archive
296 .get(Identifier::Key(&key))
297 .await
298 .expect("Failed to get data")
299 .expect("Data not found");
300 assert_eq!(retrieved, data1);
301 }
302
303 #[test_traced]
304 fn test_duplicate_key_prunable_no_compression() {
305 let executor = deterministic::Runner::default();
306 executor.start(|context| async move {
307 let archive = create_prunable(context, None).await;
308 test_duplicate_key_impl(archive).await;
309 });
310 }
311
312 #[test_traced]
313 fn test_duplicate_key_prunable_compression() {
314 let executor = deterministic::Runner::default();
315 executor.start(|context| async move {
316 let archive = create_prunable(context, Some(3)).await;
317 test_duplicate_key_impl(archive).await;
318 });
319 }
320
321 #[test_traced]
322 fn test_duplicate_key_immutable_no_compression() {
323 let executor = deterministic::Runner::default();
324 executor.start(|context| async move {
325 let archive = create_immutable(context, None).await;
326 test_duplicate_key_impl(archive).await;
327 });
328 }
329
330 #[test_traced]
331 fn test_duplicate_key_immutable_compression() {
332 let executor = deterministic::Runner::default();
333 executor.start(|context| async move {
334 let archive = create_immutable(context, Some(3)).await;
335 test_duplicate_key_impl(archive).await;
336 });
337 }
338
339 async fn test_get_nonexistent_impl(archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
340 let index = 1u64;
342 let retrieved: Option<i32> = archive
343 .get(Identifier::Index(index))
344 .await
345 .expect("Failed to get data");
346 assert!(retrieved.is_none());
347
348 let key = test_key("nonexistent");
350 let retrieved = archive
351 .get(Identifier::Key(&key))
352 .await
353 .expect("Failed to get data");
354 assert!(retrieved.is_none());
355 }
356
357 #[test_traced]
358 fn test_get_nonexistent_prunable_no_compression() {
359 let executor = deterministic::Runner::default();
360 executor.start(|context| async move {
361 let archive = create_prunable(context, None).await;
362 test_get_nonexistent_impl(archive).await;
363 });
364 }
365
366 #[test_traced]
367 fn test_get_nonexistent_prunable_compression() {
368 let executor = deterministic::Runner::default();
369 executor.start(|context| async move {
370 let archive = create_prunable(context, Some(3)).await;
371 test_get_nonexistent_impl(archive).await;
372 });
373 }
374
375 #[test_traced]
376 fn test_get_nonexistent_immutable_no_compression() {
377 let executor = deterministic::Runner::default();
378 executor.start(|context| async move {
379 let archive = create_immutable(context, None).await;
380 test_get_nonexistent_impl(archive).await;
381 });
382 }
383
384 #[test_traced]
385 fn test_get_nonexistent_immutable_compression() {
386 let executor = deterministic::Runner::default();
387 executor.start(|context| async move {
388 let archive = create_immutable(context, Some(3)).await;
389 test_get_nonexistent_impl(archive).await;
390 });
391 }
392
393 async fn test_persistence_impl<A, F, Fut>(context: Context, creator: F, compression: Option<u8>)
394 where
395 A: Archive<Key = FixedBytes<64>, Value = i32>,
396 F: Fn(Context, Option<u8>) -> Fut,
397 Fut: Future<Output = A>,
398 {
399 {
401 let mut archive = creator(context.with_label("first"), compression).await;
402
403 let keys = vec![
405 (1u64, test_key("key1"), 1),
406 (2u64, test_key("key2"), 2),
407 (3u64, test_key("key3"), 3),
408 ];
409
410 for (index, key, data) in &keys {
411 archive
412 .put(*index, key.clone(), *data)
413 .await
414 .expect("Failed to put data");
415 }
416
417 archive.sync().await.expect("Failed to sync archive");
419 }
420
421 {
423 let archive = creator(context.with_label("second"), compression).await;
424
425 let keys = vec![
427 (1u64, test_key("key1"), 1),
428 (2u64, test_key("key2"), 2),
429 (3u64, test_key("key3"), 3),
430 ];
431
432 for (index, key, expected_data) in &keys {
433 let retrieved = archive
434 .get(Identifier::Index(*index))
435 .await
436 .expect("Failed to get data")
437 .expect("Data not found");
438 assert_eq!(retrieved, *expected_data);
439
440 let retrieved = archive
441 .get(Identifier::Key(key))
442 .await
443 .expect("Failed to get data")
444 .expect("Data not found");
445 assert_eq!(retrieved, *expected_data);
446 }
447 }
448 }
449
450 #[test_traced]
451 fn test_persistence_prunable_no_compression() {
452 let executor = deterministic::Runner::default();
453 executor.start(|context| async move {
454 test_persistence_impl(context, create_prunable, None).await;
455 });
456 }
457
458 #[test_traced]
459 fn test_persistence_prunable_compression() {
460 let executor = deterministic::Runner::default();
461 executor.start(|context| async move {
462 test_persistence_impl(context, create_prunable, Some(3)).await;
463 });
464 }
465
466 #[test_traced]
467 fn test_persistence_immutable_no_compression() {
468 let executor = deterministic::Runner::default();
469 executor.start(|context| async move {
470 test_persistence_impl(context, create_immutable, None).await;
471 });
472 }
473
474 #[test_traced]
475 fn test_persistence_immutable_compression() {
476 let executor = deterministic::Runner::default();
477 executor.start(|context| async move {
478 test_persistence_impl(context, create_immutable, Some(3)).await;
479 });
480 }
481
482 async fn test_ranges_impl<A, F, Fut>(mut context: Context, creator: F, compression: Option<u8>)
483 where
484 A: Archive<Key = FixedBytes<64>, Value = i32>,
485 F: Fn(Context, Option<u8>) -> Fut,
486 Fut: Future<Output = A>,
487 {
488 let mut keys = BTreeMap::new();
489 {
490 let mut archive = creator(context.with_label("first"), compression).await;
491
492 let mut last_index = 0u64;
494 while keys.len() < 100 {
495 let gap: u64 = context.gen_range(1..=10);
496 let index = last_index + gap;
497 last_index = index;
498
499 let mut key_bytes = [0u8; 64];
500 context.fill(&mut key_bytes);
501 let key = FixedBytes::<64>::decode(key_bytes.as_ref()).unwrap();
502 let data: i32 = context.gen();
503
504 if keys.contains_key(&index) {
505 continue;
506 }
507 keys.insert(index, (key.clone(), data));
508
509 archive
510 .put(index, key, data)
511 .await
512 .expect("Failed to put data");
513 }
514
515 archive.sync().await.expect("Failed to sync archive");
516 }
517
518 {
519 let archive = creator(context.with_label("second"), compression).await;
520 let sorted_indices: Vec<u64> = keys.keys().cloned().collect();
521
522 let (current_end, start_next) = archive.next_gap(0);
524 assert!(current_end.is_none());
525 assert_eq!(start_next, Some(sorted_indices[0]));
526
527 let mut i = 0;
529 while i < sorted_indices.len() {
530 let current_index = sorted_indices[i];
531
532 let mut j = i;
534 while j + 1 < sorted_indices.len() && sorted_indices[j + 1] == sorted_indices[j] + 1
535 {
536 j += 1;
537 }
538 let block_end_index = sorted_indices[j];
539 let next_actual_index = if j + 1 < sorted_indices.len() {
540 Some(sorted_indices[j + 1])
541 } else {
542 None
543 };
544
545 let (current_end, start_next) = archive.next_gap(current_index);
546 assert_eq!(current_end, Some(block_end_index));
547 assert_eq!(start_next, next_actual_index);
548
549 if let Some(next_index) = next_actual_index {
551 if next_index > block_end_index + 1 {
552 let in_gap_index = block_end_index + 1;
553 let (current_end, start_next) = archive.next_gap(in_gap_index);
554 assert!(current_end.is_none());
555 assert_eq!(start_next, Some(next_index));
556 }
557 }
558 i = j + 1;
559 }
560
561 let last_index = *sorted_indices.last().unwrap();
563 let (current_end, start_next) = archive.next_gap(last_index);
564 assert!(current_end.is_some());
565 assert!(start_next.is_none());
566 }
567 }
568
569 #[test_traced]
570 fn test_ranges_prunable_no_compression() {
571 let executor = deterministic::Runner::default();
572 executor.start(|context| async move {
573 test_ranges_impl(context, create_prunable, None).await;
574 });
575 }
576
577 #[test_traced]
578 fn test_ranges_prunable_compression() {
579 let executor = deterministic::Runner::default();
580 executor.start(|context| async move {
581 test_ranges_impl(context, create_prunable, Some(3)).await;
582 });
583 }
584
585 #[test_traced]
586 fn test_ranges_immutable_no_compression() {
587 let executor = deterministic::Runner::default();
588 executor.start(|context| async move {
589 test_ranges_impl(context, create_immutable, None).await;
590 });
591 }
592
593 #[test_traced]
594 fn test_ranges_immutable_compression() {
595 let executor = deterministic::Runner::default();
596 executor.start(|context| async move {
597 test_ranges_impl(context, create_immutable, Some(3)).await;
598 });
599 }
600
601 async fn test_many_keys_impl<A, F, Fut>(
602 mut context: Context,
603 creator: F,
604 compression: Option<u8>,
605 num: usize,
606 ) where
607 A: Archive<Key = FixedBytes<64>, Value = i32>,
608 F: Fn(Context, Option<u8>) -> Fut,
609 Fut: Future<Output = A>,
610 {
611 let mut keys = BTreeMap::new();
613 {
614 let mut archive = creator(context.with_label("first"), compression).await;
615 while keys.len() < num {
616 let index = keys.len() as u64;
617 let mut key = [0u8; 64];
618 context.fill(&mut key);
619 let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
620 let data: i32 = context.gen();
621
622 archive
623 .put(index, key.clone(), data)
624 .await
625 .expect("Failed to put data");
626 keys.insert(key, (index, data));
627
628 if context.gen_bool(0.1) {
630 archive.sync().await.expect("Failed to sync archive");
631 }
632 }
633 archive.sync().await.expect("Failed to sync archive");
634
635 for (key, (index, data)) in &keys {
637 let retrieved = archive
638 .get(Identifier::Index(*index))
639 .await
640 .expect("Failed to get data")
641 .expect("Data not found");
642 assert_eq!(&retrieved, data);
643 let retrieved = archive
644 .get(Identifier::Key(key))
645 .await
646 .expect("Failed to get data")
647 .expect("Data not found");
648 assert_eq!(&retrieved, data);
649 }
650 }
651
652 {
654 let archive = creator(context.with_label("second"), compression).await;
655
656 for (key, (index, data)) in &keys {
658 let retrieved = archive
659 .get(Identifier::Index(*index))
660 .await
661 .expect("Failed to get data")
662 .expect("Data not found");
663 assert_eq!(&retrieved, data);
664 let retrieved = archive
665 .get(Identifier::Key(key))
666 .await
667 .expect("Failed to get data")
668 .expect("Data not found");
669 assert_eq!(&retrieved, data);
670 }
671 }
672 }
673
674 fn test_many_keys_determinism<F, Fut, A>(creator: F, compression: Option<u8>, num: usize)
675 where
676 A: Archive<Key = FixedBytes<64>, Value = i32>,
677 F: Fn(Context, Option<u8>) -> Fut + Copy + Send + 'static,
678 Fut: Future<Output = A> + Send,
679 {
680 let executor = deterministic::Runner::default();
681 let state1 = executor.start(|context| async move {
682 test_many_keys_impl(context.clone(), creator, compression, num).await;
683 context.auditor().state()
684 });
685 let executor = deterministic::Runner::default();
686 let state2 = executor.start(|context| async move {
687 test_many_keys_impl(context.clone(), creator, compression, num).await;
688 context.auditor().state()
689 });
690 assert_eq!(state1, state2);
691 }
692
693 #[test_traced]
694 fn test_many_keys_prunable_no_compression() {
695 test_many_keys_determinism(create_prunable, None, 1_000);
696 }
697
698 #[test_traced]
699 fn test_many_keys_prunable_compression() {
700 test_many_keys_determinism(create_prunable, Some(3), 1_000);
701 }
702
703 #[test_traced]
704 fn test_many_keys_immutable_no_compression() {
705 test_many_keys_determinism(create_immutable, None, 1_000);
706 }
707
708 #[test_traced]
709 fn test_many_keys_immutable_compression() {
710 test_many_keys_determinism(create_immutable, Some(3), 1_000);
711 }
712
713 #[test_group("slow")]
714 #[test_traced]
715 fn test_many_keys_prunable_large() {
716 test_many_keys_determinism(create_prunable, None, 50_000);
717 }
718
719 #[test_group("slow")]
720 #[test_traced]
721 fn test_many_keys_immutable_large() {
722 test_many_keys_determinism(create_immutable, None, 50_000);
723 }
724
725 fn assert_send<T: Send>(_: T) {}
726
727 #[allow(dead_code)]
728 fn assert_archive_futures_are_send<T: super::Archive>(
729 archive: &mut T,
730 key: T::Key,
731 value: T::Value,
732 ) where
733 T::Key: Clone,
734 T::Value: Clone,
735 {
736 assert_send(archive.put(1, key.clone(), value.clone()));
737 assert_send(archive.put_sync(2, key.clone(), value));
738 assert_send(archive.get(Identifier::Index(1)));
739 assert_send(archive.get(Identifier::Key(&key)));
740 assert_send(archive.has(Identifier::Index(1)));
741 assert_send(archive.has(Identifier::Key(&key)));
742 assert_send(archive.sync());
743 }
744
745 #[allow(dead_code)]
746 fn assert_archive_destroy_is_send<T: super::Archive>(archive: T) {
747 assert_send(archive.destroy());
748 }
749
750 #[allow(dead_code)]
751 fn assert_prunable_archive_futures_are_send(
752 archive: &mut prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
753 key: FixedBytes<64>,
754 value: i32,
755 ) {
756 assert_archive_futures_are_send(archive, key, value);
757 }
758
759 #[allow(dead_code)]
760 fn assert_prunable_archive_destroy_is_send(
761 archive: prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
762 ) {
763 assert_archive_destroy_is_send(archive);
764 }
765
766 #[allow(dead_code)]
767 fn assert_immutable_archive_futures_are_send(
768 archive: &mut immutable::Archive<Context, FixedBytes<64>, i32>,
769 key: FixedBytes<64>,
770 value: i32,
771 ) {
772 assert_archive_futures_are_send(archive, key, value);
773 }
774
775 #[allow(dead_code)]
776 fn assert_immutable_archive_destroy_is_send(
777 archive: immutable::Archive<Context, FixedBytes<64>, i32>,
778 ) {
779 assert_archive_destroy_is_send(archive);
780 }
781}