1use commonware_codec::Codec;
7use commonware_utils::Array;
8use std::future::Future;
9use thiserror::Error;
10
11pub mod immutable;
12pub mod prunable;
13
14#[cfg(all(test, feature = "arbitrary"))]
15mod conformance;
16
17pub enum Identifier<'a, K: Array> {
19 Index(u64),
20 Key(&'a K),
21}
22
23#[derive(Debug, Error)]
25pub enum Error {
26 #[error("journal error: {0}")]
27 Journal(#[from] crate::journal::Error),
28 #[error("ordinal error: {0}")]
29 Ordinal(#[from] crate::ordinal::Error),
30 #[error("metadata error: {0}")]
31 Metadata(#[from] crate::metadata::Error),
32 #[error("freezer error: {0}")]
33 Freezer(#[from] crate::freezer::Error),
34 #[error("record corrupted")]
35 RecordCorrupted,
36 #[error("already pruned to: {0}")]
37 AlreadyPrunedTo(u64),
38 #[error("record too large")]
39 RecordTooLarge,
40}
41
42pub trait Archive: Send {
44 type Key: Array;
46
47 type Value: Codec + Send;
49
50 fn put(
55 &mut self,
56 index: u64,
57 key: Self::Key,
58 value: Self::Value,
59 ) -> impl Future<Output = Result<(), Error>> + Send;
60
61 fn put_sync(
63 &mut self,
64 index: u64,
65 key: Self::Key,
66 value: Self::Value,
67 ) -> impl Future<Output = Result<(), Error>> + Send {
68 async move {
69 self.put(index, key, value).await?;
70 self.sync().await
71 }
72 }
73
74 fn get<'a>(
76 &'a self,
77 identifier: Identifier<'a, Self::Key>,
78 ) -> impl Future<Output = Result<Option<Self::Value>, Error>> + Send + use<'a, Self>;
79
80 fn has<'a>(
82 &'a self,
83 identifier: Identifier<'a, Self::Key>,
84 ) -> impl Future<Output = Result<bool, Error>> + Send + use<'a, Self>;
85
86 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>);
91
92 fn missing_items(&self, index: u64, max: usize) -> Vec<u64>;
97
98 fn ranges(&self) -> impl Iterator<Item = (u64, u64)>;
100
101 fn first_index(&self) -> Option<u64>;
103
104 fn last_index(&self) -> Option<u64>;
106
107 fn sync(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
109
110 fn destroy(self) -> impl Future<Output = Result<(), Error>> + Send;
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117 use crate::translator::TwoCap;
118 use commonware_codec::DecodeExt;
119 use commonware_macros::{test_group, test_traced};
120 use commonware_runtime::{
121 buffer::PoolRef,
122 deterministic::{self, Context},
123 Runner,
124 };
125 use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
126 use rand::Rng;
127 use std::{
128 collections::BTreeMap,
129 num::{NonZeroU16, NonZeroUsize},
130 };
131
132 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
133 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
134
135 fn test_key(key: &str) -> FixedBytes<64> {
136 let mut buf = [0u8; 64];
137 let key = key.as_bytes();
138 assert!(key.len() <= buf.len());
139 buf[..key.len()].copy_from_slice(key);
140 FixedBytes::decode(buf.as_ref()).unwrap()
141 }
142
143 async fn create_prunable(
144 context: Context,
145 compression: Option<u8>,
146 ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
147 let cfg = prunable::Config {
148 translator: TwoCap,
149 key_partition: "test_key".into(),
150 key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
151 value_partition: "test_value".into(),
152 compression,
153 codec_config: (),
154 items_per_section: NZU64!(1024),
155 key_write_buffer: NZUsize!(1024),
156 value_write_buffer: NZUsize!(1024),
157 replay_buffer: NZUsize!(1024),
158 };
159 prunable::Archive::init(context, cfg).await.unwrap()
160 }
161
162 async fn create_immutable(
163 context: Context,
164 compression: Option<u8>,
165 ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
166 let cfg = immutable::Config {
167 metadata_partition: "test_metadata".into(),
168 freezer_table_partition: "test_table".into(),
169 freezer_table_initial_size: 64,
170 freezer_table_resize_frequency: 2,
171 freezer_table_resize_chunk_size: 32,
172 freezer_key_partition: "test_key".into(),
173 freezer_key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
174 freezer_value_partition: "test_value".into(),
175 freezer_value_target_size: 1024 * 1024,
176 freezer_value_compression: compression,
177 ordinal_partition: "test_ordinal".into(),
178 items_per_section: NZU64!(1024),
179 freezer_key_write_buffer: NZUsize!(1024 * 1024),
180 freezer_value_write_buffer: NZUsize!(1024 * 1024),
181 ordinal_write_buffer: NZUsize!(1024 * 1024),
182 replay_buffer: NZUsize!(1024 * 1024),
183 codec_config: (),
184 };
185 immutable::Archive::init(context, cfg).await.unwrap()
186 }
187
188 async fn test_put_get_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
189 let index = 1u64;
190 let key = test_key("testkey");
191 let data = 1;
192
193 let has = archive
195 .has(Identifier::Index(index))
196 .await
197 .expect("Failed to check key");
198 assert!(!has);
199 let has = archive
200 .has(Identifier::Key(&key))
201 .await
202 .expect("Failed to check key");
203 assert!(!has);
204
205 archive
207 .put(index, key.clone(), data)
208 .await
209 .expect("Failed to put data");
210
211 let has = archive
213 .has(Identifier::Index(index))
214 .await
215 .expect("Failed to check key");
216 assert!(has);
217 let has = archive
218 .has(Identifier::Key(&key))
219 .await
220 .expect("Failed to check key");
221 assert!(has);
222
223 let retrieved = archive
225 .get(Identifier::Key(&key))
226 .await
227 .expect("Failed to get data");
228 assert_eq!(retrieved, Some(data));
229
230 let retrieved = archive
232 .get(Identifier::Index(index))
233 .await
234 .expect("Failed to get data");
235 assert_eq!(retrieved, Some(data));
236
237 archive.sync().await.expect("Failed to sync data");
239 }
240
241 #[test_traced]
242 fn test_put_get_prunable_no_compression() {
243 let executor = deterministic::Runner::default();
244 executor.start(|context| async move {
245 let archive = create_prunable(context, None).await;
246 test_put_get_impl(archive).await;
247 });
248 }
249
250 #[test_traced]
251 fn test_put_get_prunable_compression() {
252 let executor = deterministic::Runner::default();
253 executor.start(|context| async move {
254 let archive = create_prunable(context, Some(3)).await;
255 test_put_get_impl(archive).await;
256 });
257 }
258
259 #[test_traced]
260 fn test_put_get_immutable_no_compression() {
261 let executor = deterministic::Runner::default();
262 executor.start(|context| async move {
263 let archive = create_immutable(context, None).await;
264 test_put_get_impl(archive).await;
265 });
266 }
267
268 #[test_traced]
269 fn test_put_get_immutable_compression() {
270 let executor = deterministic::Runner::default();
271 executor.start(|context| async move {
272 let archive = create_immutable(context, Some(3)).await;
273 test_put_get_impl(archive).await;
274 });
275 }
276
277 async fn test_duplicate_key_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
278 let index = 1u64;
279 let key = test_key("duplicate");
280 let data1 = 1;
281 let data2 = 2;
282
283 archive
285 .put(index, key.clone(), data1)
286 .await
287 .expect("Failed to put data");
288
289 archive
291 .put(index, key.clone(), data2)
292 .await
293 .expect("Duplicate put should not fail");
294
295 let retrieved = archive
297 .get(Identifier::Index(index))
298 .await
299 .expect("Failed to get data")
300 .expect("Data not found");
301 assert_eq!(retrieved, data1);
302
303 let retrieved = archive
304 .get(Identifier::Key(&key))
305 .await
306 .expect("Failed to get data")
307 .expect("Data not found");
308 assert_eq!(retrieved, data1);
309 }
310
311 #[test_traced]
312 fn test_duplicate_key_prunable_no_compression() {
313 let executor = deterministic::Runner::default();
314 executor.start(|context| async move {
315 let archive = create_prunable(context, None).await;
316 test_duplicate_key_impl(archive).await;
317 });
318 }
319
320 #[test_traced]
321 fn test_duplicate_key_prunable_compression() {
322 let executor = deterministic::Runner::default();
323 executor.start(|context| async move {
324 let archive = create_prunable(context, Some(3)).await;
325 test_duplicate_key_impl(archive).await;
326 });
327 }
328
329 #[test_traced]
330 fn test_duplicate_key_immutable_no_compression() {
331 let executor = deterministic::Runner::default();
332 executor.start(|context| async move {
333 let archive = create_immutable(context, None).await;
334 test_duplicate_key_impl(archive).await;
335 });
336 }
337
338 #[test_traced]
339 fn test_duplicate_key_immutable_compression() {
340 let executor = deterministic::Runner::default();
341 executor.start(|context| async move {
342 let archive = create_immutable(context, Some(3)).await;
343 test_duplicate_key_impl(archive).await;
344 });
345 }
346
347 async fn test_get_nonexistent_impl(archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
348 let index = 1u64;
350 let retrieved: Option<i32> = archive
351 .get(Identifier::Index(index))
352 .await
353 .expect("Failed to get data");
354 assert!(retrieved.is_none());
355
356 let key = test_key("nonexistent");
358 let retrieved = archive
359 .get(Identifier::Key(&key))
360 .await
361 .expect("Failed to get data");
362 assert!(retrieved.is_none());
363 }
364
365 #[test_traced]
366 fn test_get_nonexistent_prunable_no_compression() {
367 let executor = deterministic::Runner::default();
368 executor.start(|context| async move {
369 let archive = create_prunable(context, None).await;
370 test_get_nonexistent_impl(archive).await;
371 });
372 }
373
374 #[test_traced]
375 fn test_get_nonexistent_prunable_compression() {
376 let executor = deterministic::Runner::default();
377 executor.start(|context| async move {
378 let archive = create_prunable(context, Some(3)).await;
379 test_get_nonexistent_impl(archive).await;
380 });
381 }
382
383 #[test_traced]
384 fn test_get_nonexistent_immutable_no_compression() {
385 let executor = deterministic::Runner::default();
386 executor.start(|context| async move {
387 let archive = create_immutable(context, None).await;
388 test_get_nonexistent_impl(archive).await;
389 });
390 }
391
392 #[test_traced]
393 fn test_get_nonexistent_immutable_compression() {
394 let executor = deterministic::Runner::default();
395 executor.start(|context| async move {
396 let archive = create_immutable(context, Some(3)).await;
397 test_get_nonexistent_impl(archive).await;
398 });
399 }
400
401 async fn test_persistence_impl<A, F, Fut>(context: Context, creator: F, compression: Option<u8>)
402 where
403 A: Archive<Key = FixedBytes<64>, Value = i32>,
404 F: Fn(Context, Option<u8>) -> Fut,
405 Fut: Future<Output = A>,
406 {
407 {
409 let mut archive = creator(context.clone(), compression).await;
410
411 let keys = vec![
413 (1u64, test_key("key1"), 1),
414 (2u64, test_key("key2"), 2),
415 (3u64, test_key("key3"), 3),
416 ];
417
418 for (index, key, data) in &keys {
419 archive
420 .put(*index, key.clone(), *data)
421 .await
422 .expect("Failed to put data");
423 }
424
425 archive.sync().await.expect("Failed to sync archive");
427 }
428
429 {
431 let archive = creator(context, compression).await;
432
433 let keys = vec![
435 (1u64, test_key("key1"), 1),
436 (2u64, test_key("key2"), 2),
437 (3u64, test_key("key3"), 3),
438 ];
439
440 for (index, key, expected_data) in &keys {
441 let retrieved = archive
442 .get(Identifier::Index(*index))
443 .await
444 .expect("Failed to get data")
445 .expect("Data not found");
446 assert_eq!(retrieved, *expected_data);
447
448 let retrieved = archive
449 .get(Identifier::Key(key))
450 .await
451 .expect("Failed to get data")
452 .expect("Data not found");
453 assert_eq!(retrieved, *expected_data);
454 }
455 }
456 }
457
458 #[test_traced]
459 fn test_persistence_prunable_no_compression() {
460 let executor = deterministic::Runner::default();
461 executor.start(|context| async move {
462 test_persistence_impl(context, create_prunable, None).await;
463 });
464 }
465
466 #[test_traced]
467 fn test_persistence_prunable_compression() {
468 let executor = deterministic::Runner::default();
469 executor.start(|context| async move {
470 test_persistence_impl(context, create_prunable, Some(3)).await;
471 });
472 }
473
474 #[test_traced]
475 fn test_persistence_immutable_no_compression() {
476 let executor = deterministic::Runner::default();
477 executor.start(|context| async move {
478 test_persistence_impl(context, create_immutable, None).await;
479 });
480 }
481
482 #[test_traced]
483 fn test_persistence_immutable_compression() {
484 let executor = deterministic::Runner::default();
485 executor.start(|context| async move {
486 test_persistence_impl(context, create_immutable, Some(3)).await;
487 });
488 }
489
490 async fn test_ranges_impl<A, F, Fut>(mut context: Context, creator: F, compression: Option<u8>)
491 where
492 A: Archive<Key = FixedBytes<64>, Value = i32>,
493 F: Fn(Context, Option<u8>) -> Fut,
494 Fut: Future<Output = A>,
495 {
496 let mut keys = BTreeMap::new();
497 {
498 let mut archive = creator(context.clone(), compression).await;
499
500 let mut last_index = 0u64;
502 while keys.len() < 100 {
503 let gap: u64 = context.gen_range(1..=10);
504 let index = last_index + gap;
505 last_index = index;
506
507 let mut key_bytes = [0u8; 64];
508 context.fill(&mut key_bytes);
509 let key = FixedBytes::<64>::decode(key_bytes.as_ref()).unwrap();
510 let data: i32 = context.gen();
511
512 if keys.contains_key(&index) {
513 continue;
514 }
515 keys.insert(index, (key.clone(), data));
516
517 archive
518 .put(index, key, data)
519 .await
520 .expect("Failed to put data");
521 }
522
523 archive.sync().await.expect("Failed to sync archive");
524 }
525
526 {
527 let archive = creator(context, compression).await;
528 let sorted_indices: Vec<u64> = keys.keys().cloned().collect();
529
530 let (current_end, start_next) = archive.next_gap(0);
532 assert!(current_end.is_none());
533 assert_eq!(start_next, Some(sorted_indices[0]));
534
535 let mut i = 0;
537 while i < sorted_indices.len() {
538 let current_index = sorted_indices[i];
539
540 let mut j = i;
542 while j + 1 < sorted_indices.len() && sorted_indices[j + 1] == sorted_indices[j] + 1
543 {
544 j += 1;
545 }
546 let block_end_index = sorted_indices[j];
547 let next_actual_index = if j + 1 < sorted_indices.len() {
548 Some(sorted_indices[j + 1])
549 } else {
550 None
551 };
552
553 let (current_end, start_next) = archive.next_gap(current_index);
554 assert_eq!(current_end, Some(block_end_index));
555 assert_eq!(start_next, next_actual_index);
556
557 if let Some(next_index) = next_actual_index {
559 if next_index > block_end_index + 1 {
560 let in_gap_index = block_end_index + 1;
561 let (current_end, start_next) = archive.next_gap(in_gap_index);
562 assert!(current_end.is_none());
563 assert_eq!(start_next, Some(next_index));
564 }
565 }
566 i = j + 1;
567 }
568
569 let last_index = *sorted_indices.last().unwrap();
571 let (current_end, start_next) = archive.next_gap(last_index);
572 assert!(current_end.is_some());
573 assert!(start_next.is_none());
574 }
575 }
576
577 #[test_traced]
578 fn test_ranges_prunable_no_compression() {
579 let executor = deterministic::Runner::default();
580 executor.start(|context| async move {
581 test_ranges_impl(context, create_prunable, None).await;
582 });
583 }
584
585 #[test_traced]
586 fn test_ranges_prunable_compression() {
587 let executor = deterministic::Runner::default();
588 executor.start(|context| async move {
589 test_ranges_impl(context, create_prunable, Some(3)).await;
590 });
591 }
592
593 #[test_traced]
594 fn test_ranges_immutable_no_compression() {
595 let executor = deterministic::Runner::default();
596 executor.start(|context| async move {
597 test_ranges_impl(context, create_immutable, None).await;
598 });
599 }
600
601 #[test_traced]
602 fn test_ranges_immutable_compression() {
603 let executor = deterministic::Runner::default();
604 executor.start(|context| async move {
605 test_ranges_impl(context, create_immutable, Some(3)).await;
606 });
607 }
608
609 async fn test_many_keys_impl<A, F, Fut>(
610 mut context: Context,
611 creator: F,
612 compression: Option<u8>,
613 num: usize,
614 ) where
615 A: Archive<Key = FixedBytes<64>, Value = i32>,
616 F: Fn(Context, Option<u8>) -> Fut,
617 Fut: Future<Output = A>,
618 {
619 let mut keys = BTreeMap::new();
621 {
622 let mut archive = creator(context.clone(), compression).await;
623 while keys.len() < num {
624 let index = keys.len() as u64;
625 let mut key = [0u8; 64];
626 context.fill(&mut key);
627 let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
628 let data: i32 = context.gen();
629
630 archive
631 .put(index, key.clone(), data)
632 .await
633 .expect("Failed to put data");
634 keys.insert(key, (index, data));
635
636 if context.gen_bool(0.1) {
638 archive.sync().await.expect("Failed to sync archive");
639 }
640 }
641 archive.sync().await.expect("Failed to sync archive");
642
643 for (key, (index, data)) in &keys {
645 let retrieved = archive
646 .get(Identifier::Index(*index))
647 .await
648 .expect("Failed to get data")
649 .expect("Data not found");
650 assert_eq!(&retrieved, data);
651 let retrieved = archive
652 .get(Identifier::Key(key))
653 .await
654 .expect("Failed to get data")
655 .expect("Data not found");
656 assert_eq!(&retrieved, data);
657 }
658 }
659
660 {
662 let archive = creator(context.clone(), compression).await;
663
664 for (key, (index, data)) in &keys {
666 let retrieved = archive
667 .get(Identifier::Index(*index))
668 .await
669 .expect("Failed to get data")
670 .expect("Data not found");
671 assert_eq!(&retrieved, data);
672 let retrieved = archive
673 .get(Identifier::Key(key))
674 .await
675 .expect("Failed to get data")
676 .expect("Data not found");
677 assert_eq!(&retrieved, data);
678 }
679 }
680 }
681
682 fn test_many_keys_determinism<F, Fut, A>(creator: F, compression: Option<u8>, num: usize)
683 where
684 A: Archive<Key = FixedBytes<64>, Value = i32>,
685 F: Fn(Context, Option<u8>) -> Fut + Copy + Send + 'static,
686 Fut: Future<Output = A> + Send,
687 {
688 let executor = deterministic::Runner::default();
689 let state1 = executor.start(|context| async move {
690 test_many_keys_impl(context.clone(), creator, compression, num).await;
691 context.auditor().state()
692 });
693 let executor = deterministic::Runner::default();
694 let state2 = executor.start(|context| async move {
695 test_many_keys_impl(context.clone(), creator, compression, num).await;
696 context.auditor().state()
697 });
698 assert_eq!(state1, state2);
699 }
700
701 #[test_traced]
702 fn test_many_keys_prunable_no_compression() {
703 test_many_keys_determinism(create_prunable, None, 1_000);
704 }
705
706 #[test_traced]
707 fn test_many_keys_prunable_compression() {
708 test_many_keys_determinism(create_prunable, Some(3), 1_000);
709 }
710
711 #[test_traced]
712 fn test_many_keys_immutable_no_compression() {
713 test_many_keys_determinism(create_immutable, None, 1_000);
714 }
715
716 #[test_traced]
717 fn test_many_keys_immutable_compression() {
718 test_many_keys_determinism(create_immutable, Some(3), 1_000);
719 }
720
721 #[test_group("slow")]
722 #[test_traced]
723 fn test_many_keys_prunable_large() {
724 test_many_keys_determinism(create_prunable, None, 50_000);
725 }
726
727 #[test_group("slow")]
728 #[test_traced]
729 fn test_many_keys_immutable_large() {
730 test_many_keys_determinism(create_immutable, None, 50_000);
731 }
732
733 fn assert_send<T: Send>(_: T) {}
734
735 #[allow(dead_code)]
736 fn assert_archive_futures_are_send<T: super::Archive>(
737 archive: &mut T,
738 key: T::Key,
739 value: T::Value,
740 ) where
741 T::Key: Clone,
742 T::Value: Clone,
743 {
744 assert_send(archive.put(1, key.clone(), value.clone()));
745 assert_send(archive.put_sync(2, key.clone(), value));
746 assert_send(archive.get(Identifier::Index(1)));
747 assert_send(archive.get(Identifier::Key(&key)));
748 assert_send(archive.has(Identifier::Index(1)));
749 assert_send(archive.has(Identifier::Key(&key)));
750 assert_send(archive.sync());
751 }
752
753 #[allow(dead_code)]
754 fn assert_archive_destroy_is_send<T: super::Archive>(archive: T) {
755 assert_send(archive.destroy());
756 }
757
758 #[allow(dead_code)]
759 fn assert_prunable_archive_futures_are_send(
760 archive: &mut prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
761 key: FixedBytes<64>,
762 value: i32,
763 ) {
764 assert_archive_futures_are_send(archive, key, value);
765 }
766
767 #[allow(dead_code)]
768 fn assert_prunable_archive_destroy_is_send(
769 archive: prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
770 ) {
771 assert_archive_destroy_is_send(archive);
772 }
773
774 #[allow(dead_code)]
775 fn assert_immutable_archive_futures_are_send(
776 archive: &mut immutable::Archive<Context, FixedBytes<64>, i32>,
777 key: FixedBytes<64>,
778 value: i32,
779 ) {
780 assert_archive_futures_are_send(archive, key, value);
781 }
782
783 #[allow(dead_code)]
784 fn assert_immutable_archive_destroy_is_send(
785 archive: immutable::Archive<Context, FixedBytes<64>, i32>,
786 ) {
787 assert_archive_destroy_is_send(archive);
788 }
789}