1use commonware_codec::Codec;
7use commonware_utils::Array;
8use std::future::Future;
9use thiserror::Error;
10
11pub mod immutable;
12pub mod prunable;
13
14pub enum Identifier<'a, K: Array> {
16 Index(u64),
17 Key(&'a K),
18}
19
20#[derive(Debug, Error)]
22pub enum Error {
23 #[error("journal error: {0}")]
24 Journal(#[from] crate::journal::Error),
25 #[error("ordinal error: {0}")]
26 Ordinal(#[from] crate::ordinal::Error),
27 #[error("metadata error: {0}")]
28 Metadata(#[from] crate::metadata::Error),
29 #[error("freezer error: {0}")]
30 Freezer(#[from] crate::freezer::Error),
31 #[error("record corrupted")]
32 RecordCorrupted,
33 #[error("already pruned to: {0}")]
34 AlreadyPrunedTo(u64),
35 #[error("record too large")]
36 RecordTooLarge,
37}
38
39pub trait Archive {
41 type Key: Array;
43
44 type Value: Codec;
46
47 fn put(
52 &mut self,
53 index: u64,
54 key: Self::Key,
55 value: Self::Value,
56 ) -> impl Future<Output = Result<(), Error>>;
57
58 fn put_sync(
60 &mut self,
61 index: u64,
62 key: Self::Key,
63 value: Self::Value,
64 ) -> impl Future<Output = Result<(), Error>> {
65 async move {
66 self.put(index, key, value).await?;
67 self.sync().await
68 }
69 }
70
71 fn get(
73 &self,
74 identifier: Identifier<'_, Self::Key>,
75 ) -> impl Future<Output = Result<Option<Self::Value>, Error>>;
76
77 fn has(
79 &self,
80 identifier: Identifier<'_, Self::Key>,
81 ) -> impl Future<Output = Result<bool, Error>>;
82
83 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>);
88
89 fn missing_items(&self, index: u64, max: usize) -> Vec<u64>;
94
95 fn ranges(&self) -> impl Iterator<Item = (u64, u64)>;
97
98 fn first_index(&self) -> Option<u64>;
100
101 fn last_index(&self) -> Option<u64>;
103
104 fn sync(&mut self) -> impl Future<Output = Result<(), Error>>;
106
107 fn close(self) -> impl Future<Output = Result<(), Error>>;
111
112 fn destroy(self) -> impl Future<Output = Result<(), Error>>;
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119 use crate::translator::TwoCap;
120 use commonware_codec::DecodeExt;
121 use commonware_macros::{test_group, test_traced};
122 use commonware_runtime::{
123 buffer::PoolRef,
124 deterministic::{self, Context},
125 Runner,
126 };
127 use commonware_utils::{sequence::FixedBytes, NZUsize, NZU64};
128 use rand::Rng;
129 use std::{collections::BTreeMap, num::NonZeroUsize};
130
131 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
132 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
133
134 fn test_key(key: &str) -> FixedBytes<64> {
135 let mut buf = [0u8; 64];
136 let key = key.as_bytes();
137 assert!(key.len() <= buf.len());
138 buf[..key.len()].copy_from_slice(key);
139 FixedBytes::decode(buf.as_ref()).unwrap()
140 }
141
142 async fn create_prunable(
143 context: Context,
144 compression: Option<u8>,
145 ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
146 let cfg = prunable::Config {
147 partition: "test".into(),
148 translator: TwoCap,
149 compression,
150 codec_config: (),
151 items_per_section: NZU64!(1024),
152 write_buffer: NZUsize!(1024),
153 replay_buffer: NZUsize!(1024),
154 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
155 };
156 prunable::Archive::init(context, cfg).await.unwrap()
157 }
158
159 async fn create_immutable(
160 context: Context,
161 compression: Option<u8>,
162 ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
163 let cfg = immutable::Config {
164 metadata_partition: "test_metadata".into(),
165 freezer_table_partition: "test_table".into(),
166 freezer_table_initial_size: 64,
167 freezer_table_resize_frequency: 2,
168 freezer_table_resize_chunk_size: 32,
169 freezer_journal_partition: "test_journal".into(),
170 freezer_journal_target_size: 1024 * 1024,
171 freezer_journal_compression: compression,
172 freezer_journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
173 ordinal_partition: "test_ordinal".into(),
174 items_per_section: NZU64!(1024),
175 write_buffer: NZUsize!(1024 * 1024),
176 replay_buffer: NZUsize!(1024 * 1024),
177 codec_config: (),
178 };
179 immutable::Archive::init(context, cfg).await.unwrap()
180 }
181
182 async fn test_put_get_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
183 let index = 1u64;
184 let key = test_key("testkey");
185 let data = 1;
186
187 let has = archive
189 .has(Identifier::Index(index))
190 .await
191 .expect("Failed to check key");
192 assert!(!has);
193 let has = archive
194 .has(Identifier::Key(&key))
195 .await
196 .expect("Failed to check key");
197 assert!(!has);
198
199 archive
201 .put(index, key.clone(), data)
202 .await
203 .expect("Failed to put data");
204
205 let has = archive
207 .has(Identifier::Index(index))
208 .await
209 .expect("Failed to check key");
210 assert!(has);
211 let has = archive
212 .has(Identifier::Key(&key))
213 .await
214 .expect("Failed to check key");
215 assert!(has);
216
217 let retrieved = archive
219 .get(Identifier::Key(&key))
220 .await
221 .expect("Failed to get data");
222 assert_eq!(retrieved, Some(data));
223
224 let retrieved = archive
226 .get(Identifier::Index(index))
227 .await
228 .expect("Failed to get data");
229 assert_eq!(retrieved, Some(data));
230
231 archive.sync().await.expect("Failed to sync data");
233
234 archive.close().await.expect("Failed to close archive");
236 }
237
238 #[test_traced]
239 fn test_put_get_prunable_no_compression() {
240 let executor = deterministic::Runner::default();
241 executor.start(|context| async move {
242 let archive = create_prunable(context, None).await;
243 test_put_get_impl(archive).await;
244 });
245 }
246
247 #[test_traced]
248 fn test_put_get_prunable_compression() {
249 let executor = deterministic::Runner::default();
250 executor.start(|context| async move {
251 let archive = create_prunable(context, Some(3)).await;
252 test_put_get_impl(archive).await;
253 });
254 }
255
256 #[test_traced]
257 fn test_put_get_immutable_no_compression() {
258 let executor = deterministic::Runner::default();
259 executor.start(|context| async move {
260 let archive = create_immutable(context, None).await;
261 test_put_get_impl(archive).await;
262 });
263 }
264
265 #[test_traced]
266 fn test_put_get_immutable_compression() {
267 let executor = deterministic::Runner::default();
268 executor.start(|context| async move {
269 let archive = create_immutable(context, Some(3)).await;
270 test_put_get_impl(archive).await;
271 });
272 }
273
274 async fn test_duplicate_key_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
275 let index = 1u64;
276 let key = test_key("duplicate");
277 let data1 = 1;
278 let data2 = 2;
279
280 archive
282 .put(index, key.clone(), data1)
283 .await
284 .expect("Failed to put data");
285
286 archive
288 .put(index, key.clone(), data2)
289 .await
290 .expect("Duplicate put should not fail");
291
292 let retrieved = archive
294 .get(Identifier::Index(index))
295 .await
296 .expect("Failed to get data")
297 .expect("Data not found");
298 assert_eq!(retrieved, data1);
299
300 let retrieved = archive
301 .get(Identifier::Key(&key))
302 .await
303 .expect("Failed to get data")
304 .expect("Data not found");
305 assert_eq!(retrieved, data1);
306
307 archive.close().await.expect("Failed to close archive");
308 }
309
310 #[test_traced]
311 fn test_duplicate_key_prunable_no_compression() {
312 let executor = deterministic::Runner::default();
313 executor.start(|context| async move {
314 let archive = create_prunable(context, None).await;
315 test_duplicate_key_impl(archive).await;
316 });
317 }
318
319 #[test_traced]
320 fn test_duplicate_key_prunable_compression() {
321 let executor = deterministic::Runner::default();
322 executor.start(|context| async move {
323 let archive = create_prunable(context, Some(3)).await;
324 test_duplicate_key_impl(archive).await;
325 });
326 }
327
328 #[test_traced]
329 fn test_duplicate_key_immutable_no_compression() {
330 let executor = deterministic::Runner::default();
331 executor.start(|context| async move {
332 let archive = create_immutable(context, None).await;
333 test_duplicate_key_impl(archive).await;
334 });
335 }
336
337 #[test_traced]
338 fn test_duplicate_key_immutable_compression() {
339 let executor = deterministic::Runner::default();
340 executor.start(|context| async move {
341 let archive = create_immutable(context, Some(3)).await;
342 test_duplicate_key_impl(archive).await;
343 });
344 }
345
346 async fn test_get_nonexistent_impl(archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
347 let index = 1u64;
349 let retrieved: Option<i32> = archive
350 .get(Identifier::Index(index))
351 .await
352 .expect("Failed to get data");
353 assert!(retrieved.is_none());
354
355 let key = test_key("nonexistent");
357 let retrieved = archive
358 .get(Identifier::Key(&key))
359 .await
360 .expect("Failed to get data");
361 assert!(retrieved.is_none());
362
363 archive.close().await.expect("Failed to close archive");
364 }
365
366 #[test_traced]
367 fn test_get_nonexistent_prunable_no_compression() {
368 let executor = deterministic::Runner::default();
369 executor.start(|context| async move {
370 let archive = create_prunable(context, None).await;
371 test_get_nonexistent_impl(archive).await;
372 });
373 }
374
375 #[test_traced]
376 fn test_get_nonexistent_prunable_compression() {
377 let executor = deterministic::Runner::default();
378 executor.start(|context| async move {
379 let archive = create_prunable(context, Some(3)).await;
380 test_get_nonexistent_impl(archive).await;
381 });
382 }
383
384 #[test_traced]
385 fn test_get_nonexistent_immutable_no_compression() {
386 let executor = deterministic::Runner::default();
387 executor.start(|context| async move {
388 let archive = create_immutable(context, None).await;
389 test_get_nonexistent_impl(archive).await;
390 });
391 }
392
393 #[test_traced]
394 fn test_get_nonexistent_immutable_compression() {
395 let executor = deterministic::Runner::default();
396 executor.start(|context| async move {
397 let archive = create_immutable(context, Some(3)).await;
398 test_get_nonexistent_impl(archive).await;
399 });
400 }
401
402 async fn test_persistence_impl<A, F, Fut>(context: Context, creator: F, compression: Option<u8>)
403 where
404 A: Archive<Key = FixedBytes<64>, Value = i32>,
405 F: Fn(Context, Option<u8>) -> Fut,
406 Fut: Future<Output = A>,
407 {
408 {
410 let mut archive = creator(context.clone(), compression).await;
411
412 let keys = vec![
414 (1u64, test_key("key1"), 1),
415 (2u64, test_key("key2"), 2),
416 (3u64, test_key("key3"), 3),
417 ];
418
419 for (index, key, data) in &keys {
420 archive
421 .put(*index, key.clone(), *data)
422 .await
423 .expect("Failed to put data");
424 }
425
426 archive.close().await.expect("Failed to close archive");
428 }
429
430 {
432 let archive = creator(context, compression).await;
433
434 let keys = vec![
436 (1u64, test_key("key1"), 1),
437 (2u64, test_key("key2"), 2),
438 (3u64, test_key("key3"), 3),
439 ];
440
441 for (index, key, expected_data) in &keys {
442 let retrieved = archive
443 .get(Identifier::Index(*index))
444 .await
445 .expect("Failed to get data")
446 .expect("Data not found");
447 assert_eq!(retrieved, *expected_data);
448
449 let retrieved = archive
450 .get(Identifier::Key(key))
451 .await
452 .expect("Failed to get data")
453 .expect("Data not found");
454 assert_eq!(retrieved, *expected_data);
455 }
456
457 archive.close().await.expect("Failed to close archive");
458 }
459 }
460
461 #[test_traced]
462 fn test_persistence_prunable_no_compression() {
463 let executor = deterministic::Runner::default();
464 executor.start(|context| async move {
465 test_persistence_impl(context, create_prunable, None).await;
466 });
467 }
468
469 #[test_traced]
470 fn test_persistence_prunable_compression() {
471 let executor = deterministic::Runner::default();
472 executor.start(|context| async move {
473 test_persistence_impl(context, create_prunable, Some(3)).await;
474 });
475 }
476
477 #[test_traced]
478 fn test_persistence_immutable_no_compression() {
479 let executor = deterministic::Runner::default();
480 executor.start(|context| async move {
481 test_persistence_impl(context, create_immutable, None).await;
482 });
483 }
484
485 #[test_traced]
486 fn test_persistence_immutable_compression() {
487 let executor = deterministic::Runner::default();
488 executor.start(|context| async move {
489 test_persistence_impl(context, create_immutable, Some(3)).await;
490 });
491 }
492
493 async fn test_ranges_impl<A, F, Fut>(mut context: Context, creator: F, compression: Option<u8>)
494 where
495 A: Archive<Key = FixedBytes<64>, Value = i32>,
496 F: Fn(Context, Option<u8>) -> Fut,
497 Fut: Future<Output = A>,
498 {
499 let mut keys = BTreeMap::new();
500 {
501 let mut archive = creator(context.clone(), compression).await;
502
503 let mut last_index = 0u64;
505 while keys.len() < 100 {
506 let gap: u64 = context.gen_range(1..=10);
507 let index = last_index + gap;
508 last_index = index;
509
510 let mut key_bytes = [0u8; 64];
511 context.fill(&mut key_bytes);
512 let key = FixedBytes::<64>::decode(key_bytes.as_ref()).unwrap();
513 let data: i32 = context.gen();
514
515 if keys.contains_key(&index) {
516 continue;
517 }
518 keys.insert(index, (key.clone(), data));
519
520 archive
521 .put(index, key, data)
522 .await
523 .expect("Failed to put data");
524 }
525
526 archive.close().await.expect("Failed to close archive");
527 }
528
529 {
530 let archive = creator(context, compression).await;
531 let sorted_indices: Vec<u64> = keys.keys().cloned().collect();
532
533 let (current_end, start_next) = archive.next_gap(0);
535 assert!(current_end.is_none());
536 assert_eq!(start_next, Some(sorted_indices[0]));
537
538 let mut i = 0;
540 while i < sorted_indices.len() {
541 let current_index = sorted_indices[i];
542
543 let mut j = i;
545 while j + 1 < sorted_indices.len() && sorted_indices[j + 1] == sorted_indices[j] + 1
546 {
547 j += 1;
548 }
549 let block_end_index = sorted_indices[j];
550 let next_actual_index = if j + 1 < sorted_indices.len() {
551 Some(sorted_indices[j + 1])
552 } else {
553 None
554 };
555
556 let (current_end, start_next) = archive.next_gap(current_index);
557 assert_eq!(current_end, Some(block_end_index));
558 assert_eq!(start_next, next_actual_index);
559
560 if let Some(next_index) = next_actual_index {
562 if next_index > block_end_index + 1 {
563 let in_gap_index = block_end_index + 1;
564 let (current_end, start_next) = archive.next_gap(in_gap_index);
565 assert!(current_end.is_none());
566 assert_eq!(start_next, Some(next_index));
567 }
568 }
569 i = j + 1;
570 }
571
572 let last_index = *sorted_indices.last().unwrap();
574 let (current_end, start_next) = archive.next_gap(last_index);
575 assert!(current_end.is_some());
576 assert!(start_next.is_none());
577
578 archive.close().await.expect("Failed to close archive");
579 }
580 }
581
582 #[test_traced]
583 fn test_ranges_prunable_no_compression() {
584 let executor = deterministic::Runner::default();
585 executor.start(|context| async move {
586 test_ranges_impl(context, create_prunable, None).await;
587 });
588 }
589
590 #[test_traced]
591 fn test_ranges_prunable_compression() {
592 let executor = deterministic::Runner::default();
593 executor.start(|context| async move {
594 test_ranges_impl(context, create_prunable, Some(3)).await;
595 });
596 }
597
598 #[test_traced]
599 fn test_ranges_immutable_no_compression() {
600 let executor = deterministic::Runner::default();
601 executor.start(|context| async move {
602 test_ranges_impl(context, create_immutable, None).await;
603 });
604 }
605
606 #[test_traced]
607 fn test_ranges_immutable_compression() {
608 let executor = deterministic::Runner::default();
609 executor.start(|context| async move {
610 test_ranges_impl(context, create_immutable, Some(3)).await;
611 });
612 }
613
614 async fn test_many_keys_impl<A, F, Fut>(
615 mut context: Context,
616 creator: F,
617 compression: Option<u8>,
618 num: usize,
619 ) where
620 A: Archive<Key = FixedBytes<64>, Value = i32>,
621 F: Fn(Context, Option<u8>) -> Fut,
622 Fut: Future<Output = A>,
623 {
624 let mut keys = BTreeMap::new();
626 {
627 let mut archive = creator(context.clone(), compression).await;
628 while keys.len() < num {
629 let index = keys.len() as u64;
630 let mut key = [0u8; 64];
631 context.fill(&mut key);
632 let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
633 let data: i32 = context.gen();
634
635 archive
636 .put(index, key.clone(), data)
637 .await
638 .expect("Failed to put data");
639 keys.insert(key, (index, data));
640
641 if context.gen_bool(0.1) {
643 archive.sync().await.expect("Failed to sync archive");
644 }
645 }
646 archive.sync().await.expect("Failed to sync archive");
647
648 for (key, (index, data)) in &keys {
650 let retrieved = archive
651 .get(Identifier::Index(*index))
652 .await
653 .expect("Failed to get data")
654 .expect("Data not found");
655 assert_eq!(&retrieved, data);
656 let retrieved = archive
657 .get(Identifier::Key(key))
658 .await
659 .expect("Failed to get data")
660 .expect("Data not found");
661 assert_eq!(&retrieved, data);
662 }
663
664 archive.close().await.expect("Failed to close archive");
665 }
666
667 {
669 let archive = creator(context.clone(), compression).await;
670
671 for (key, (index, data)) in &keys {
673 let retrieved = archive
674 .get(Identifier::Index(*index))
675 .await
676 .expect("Failed to get data")
677 .expect("Data not found");
678 assert_eq!(&retrieved, data);
679 let retrieved = archive
680 .get(Identifier::Key(key))
681 .await
682 .expect("Failed to get data")
683 .expect("Data not found");
684 assert_eq!(&retrieved, data);
685 }
686
687 archive.close().await.expect("Failed to close archive");
688 }
689 }
690
691 fn test_many_keys_determinism<F, Fut, A>(creator: F, compression: Option<u8>, num: usize)
692 where
693 A: Archive<Key = FixedBytes<64>, Value = i32>,
694 F: Fn(Context, Option<u8>) -> Fut + Copy + Send + 'static,
695 Fut: Future<Output = A> + Send,
696 {
697 let executor = deterministic::Runner::default();
698 let state1 = executor.start(|context| async move {
699 test_many_keys_impl(context.clone(), creator, compression, num).await;
700 context.auditor().state()
701 });
702 let executor = deterministic::Runner::default();
703 let state2 = executor.start(|context| async move {
704 test_many_keys_impl(context.clone(), creator, compression, num).await;
705 context.auditor().state()
706 });
707 assert_eq!(state1, state2);
708 }
709
710 #[test_traced]
711 fn test_many_keys_prunable_no_compression() {
712 test_many_keys_determinism(create_prunable, None, 1_000);
713 }
714
715 #[test_traced]
716 fn test_many_keys_prunable_compression() {
717 test_many_keys_determinism(create_prunable, Some(3), 1_000);
718 }
719
720 #[test_traced]
721 fn test_many_keys_immutable_no_compression() {
722 test_many_keys_determinism(create_immutable, None, 1_000);
723 }
724
725 #[test_traced]
726 fn test_many_keys_immutable_compression() {
727 test_many_keys_determinism(create_immutable, Some(3), 1_000);
728 }
729
730 #[test_group("slow")]
731 #[test_traced]
732 fn test_many_keys_prunable_large() {
733 test_many_keys_determinism(create_prunable, None, 50_000);
734 }
735
736 #[test_group("slow")]
737 #[test_traced]
738 fn test_many_keys_immutable_large() {
739 test_many_keys_determinism(create_immutable, None, 50_000);
740 }
741}