1use commonware_codec::Codec;
7use commonware_utils::Array;
8use std::future::Future;
9use thiserror::Error;
10
11pub mod immutable;
12pub mod prunable;
13
14pub enum Identifier<'a, K: Array> {
16 Index(u64),
17 Key(&'a K),
18}
19
20#[derive(Debug, Error)]
22pub enum Error {
23 #[error("journal error: {0}")]
24 Journal(#[from] crate::journal::Error),
25 #[error("ordinal error: {0}")]
26 Ordinal(#[from] crate::ordinal::Error),
27 #[error("metadata error: {0}")]
28 Metadata(#[from] crate::metadata::Error),
29 #[error("freezer error: {0}")]
30 Freezer(#[from] crate::freezer::Error),
31 #[error("record corrupted")]
32 RecordCorrupted,
33 #[error("already pruned to: {0}")]
34 AlreadyPrunedTo(u64),
35 #[error("record too large")]
36 RecordTooLarge,
37}
38
39pub trait Archive {
41 type Key: Array;
43
44 type Value: Codec;
46
47 fn put(
52 &mut self,
53 index: u64,
54 key: Self::Key,
55 value: Self::Value,
56 ) -> impl Future<Output = Result<(), Error>>;
57
58 fn put_sync(
60 &mut self,
61 index: u64,
62 key: Self::Key,
63 value: Self::Value,
64 ) -> impl Future<Output = Result<(), Error>> {
65 async move {
66 self.put(index, key, value).await?;
67 self.sync().await
68 }
69 }
70
71 fn get(
73 &self,
74 identifier: Identifier<'_, Self::Key>,
75 ) -> impl Future<Output = Result<Option<Self::Value>, Error>>;
76
77 fn has(
79 &self,
80 identifier: Identifier<'_, Self::Key>,
81 ) -> impl Future<Output = Result<bool, Error>>;
82
83 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>);
88
89 fn sync(&mut self) -> impl Future<Output = Result<(), Error>>;
91
92 fn close(self) -> impl Future<Output = Result<(), Error>>;
96
97 fn destroy(self) -> impl Future<Output = Result<(), Error>>;
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104 use crate::translator::TwoCap;
105 use commonware_codec::DecodeExt;
106 use commonware_macros::test_traced;
107 use commonware_runtime::{
108 buffer::PoolRef,
109 deterministic::{self, Context},
110 Runner,
111 };
112 use commonware_utils::{sequence::FixedBytes, NZUsize, NZU64};
113 use rand::Rng;
114 use std::{collections::BTreeMap, num::NonZeroUsize};
115
116 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
117 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
118
119 fn test_key(key: &str) -> FixedBytes<64> {
120 let mut buf = [0u8; 64];
121 let key = key.as_bytes();
122 assert!(key.len() <= buf.len());
123 buf[..key.len()].copy_from_slice(key);
124 FixedBytes::decode(buf.as_ref()).unwrap()
125 }
126
127 async fn create_prunable(
128 context: Context,
129 compression: Option<u8>,
130 ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
131 let cfg = prunable::Config {
132 partition: "test".into(),
133 translator: TwoCap,
134 compression,
135 codec_config: (),
136 items_per_section: NZU64!(1024),
137 write_buffer: NZUsize!(1024),
138 replay_buffer: NZUsize!(1024),
139 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
140 };
141 prunable::Archive::init(context, cfg).await.unwrap()
142 }
143
144 async fn create_immutable(
145 context: Context,
146 compression: Option<u8>,
147 ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
148 let cfg = immutable::Config {
149 metadata_partition: "test_metadata".into(),
150 freezer_table_partition: "test_table".into(),
151 freezer_table_initial_size: 64,
152 freezer_table_resize_frequency: 2,
153 freezer_table_resize_chunk_size: 32,
154 freezer_journal_partition: "test_journal".into(),
155 freezer_journal_target_size: 1024 * 1024,
156 freezer_journal_compression: compression,
157 freezer_journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
158 ordinal_partition: "test_ordinal".into(),
159 items_per_section: NZU64!(1024),
160 write_buffer: NZUsize!(1024 * 1024),
161 replay_buffer: NZUsize!(1024 * 1024),
162 codec_config: (),
163 };
164 immutable::Archive::init(context, cfg).await.unwrap()
165 }
166
167 async fn test_put_get_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
168 let index = 1u64;
169 let key = test_key("testkey");
170 let data = 1;
171
172 let has = archive
174 .has(Identifier::Index(index))
175 .await
176 .expect("Failed to check key");
177 assert!(!has);
178 let has = archive
179 .has(Identifier::Key(&key))
180 .await
181 .expect("Failed to check key");
182 assert!(!has);
183
184 archive
186 .put(index, key.clone(), data)
187 .await
188 .expect("Failed to put data");
189
190 let has = archive
192 .has(Identifier::Index(index))
193 .await
194 .expect("Failed to check key");
195 assert!(has);
196 let has = archive
197 .has(Identifier::Key(&key))
198 .await
199 .expect("Failed to check key");
200 assert!(has);
201
202 let retrieved = archive
204 .get(Identifier::Key(&key))
205 .await
206 .expect("Failed to get data");
207 assert_eq!(retrieved, Some(data));
208
209 let retrieved = archive
211 .get(Identifier::Index(index))
212 .await
213 .expect("Failed to get data");
214 assert_eq!(retrieved, Some(data));
215
216 archive.sync().await.expect("Failed to sync data");
218
219 archive.close().await.expect("Failed to close archive");
221 }
222
223 #[test_traced]
224 fn test_put_get_prunable_no_compression() {
225 let executor = deterministic::Runner::default();
226 executor.start(|context| async move {
227 let archive = create_prunable(context, None).await;
228 test_put_get_impl(archive).await;
229 });
230 }
231
232 #[test_traced]
233 fn test_put_get_prunable_compression() {
234 let executor = deterministic::Runner::default();
235 executor.start(|context| async move {
236 let archive = create_prunable(context, Some(3)).await;
237 test_put_get_impl(archive).await;
238 });
239 }
240
241 #[test_traced]
242 fn test_put_get_immutable_no_compression() {
243 let executor = deterministic::Runner::default();
244 executor.start(|context| async move {
245 let archive = create_immutable(context, None).await;
246 test_put_get_impl(archive).await;
247 });
248 }
249
250 #[test_traced]
251 fn test_put_get_immutable_compression() {
252 let executor = deterministic::Runner::default();
253 executor.start(|context| async move {
254 let archive = create_immutable(context, Some(3)).await;
255 test_put_get_impl(archive).await;
256 });
257 }
258
259 async fn test_duplicate_key_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
260 let index = 1u64;
261 let key = test_key("duplicate");
262 let data1 = 1;
263 let data2 = 2;
264
265 archive
267 .put(index, key.clone(), data1)
268 .await
269 .expect("Failed to put data");
270
271 archive
273 .put(index, key.clone(), data2)
274 .await
275 .expect("Duplicate put should not fail");
276
277 let retrieved = archive
279 .get(Identifier::Index(index))
280 .await
281 .expect("Failed to get data")
282 .expect("Data not found");
283 assert_eq!(retrieved, data1);
284
285 let retrieved = archive
286 .get(Identifier::Key(&key))
287 .await
288 .expect("Failed to get data")
289 .expect("Data not found");
290 assert_eq!(retrieved, data1);
291
292 archive.close().await.expect("Failed to close archive");
293 }
294
295 #[test_traced]
296 fn test_duplicate_key_prunable_no_compression() {
297 let executor = deterministic::Runner::default();
298 executor.start(|context| async move {
299 let archive = create_prunable(context, None).await;
300 test_duplicate_key_impl(archive).await;
301 });
302 }
303
304 #[test_traced]
305 fn test_duplicate_key_prunable_compression() {
306 let executor = deterministic::Runner::default();
307 executor.start(|context| async move {
308 let archive = create_prunable(context, Some(3)).await;
309 test_duplicate_key_impl(archive).await;
310 });
311 }
312
313 #[test_traced]
314 fn test_duplicate_key_immutable_no_compression() {
315 let executor = deterministic::Runner::default();
316 executor.start(|context| async move {
317 let archive = create_immutable(context, None).await;
318 test_duplicate_key_impl(archive).await;
319 });
320 }
321
322 #[test_traced]
323 fn test_duplicate_key_immutable_compression() {
324 let executor = deterministic::Runner::default();
325 executor.start(|context| async move {
326 let archive = create_immutable(context, Some(3)).await;
327 test_duplicate_key_impl(archive).await;
328 });
329 }
330
331 async fn test_get_nonexistent_impl(archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
332 let index = 1u64;
334 let retrieved: Option<i32> = archive
335 .get(Identifier::Index(index))
336 .await
337 .expect("Failed to get data");
338 assert!(retrieved.is_none());
339
340 let key = test_key("nonexistent");
342 let retrieved = archive
343 .get(Identifier::Key(&key))
344 .await
345 .expect("Failed to get data");
346 assert!(retrieved.is_none());
347
348 archive.close().await.expect("Failed to close archive");
349 }
350
351 #[test_traced]
352 fn test_get_nonexistent_prunable_no_compression() {
353 let executor = deterministic::Runner::default();
354 executor.start(|context| async move {
355 let archive = create_prunable(context, None).await;
356 test_get_nonexistent_impl(archive).await;
357 });
358 }
359
360 #[test_traced]
361 fn test_get_nonexistent_prunable_compression() {
362 let executor = deterministic::Runner::default();
363 executor.start(|context| async move {
364 let archive = create_prunable(context, Some(3)).await;
365 test_get_nonexistent_impl(archive).await;
366 });
367 }
368
369 #[test_traced]
370 fn test_get_nonexistent_immutable_no_compression() {
371 let executor = deterministic::Runner::default();
372 executor.start(|context| async move {
373 let archive = create_immutable(context, None).await;
374 test_get_nonexistent_impl(archive).await;
375 });
376 }
377
378 #[test_traced]
379 fn test_get_nonexistent_immutable_compression() {
380 let executor = deterministic::Runner::default();
381 executor.start(|context| async move {
382 let archive = create_immutable(context, Some(3)).await;
383 test_get_nonexistent_impl(archive).await;
384 });
385 }
386
387 async fn test_persistence_impl<A, F, Fut>(context: Context, creator: F, compression: Option<u8>)
388 where
389 A: Archive<Key = FixedBytes<64>, Value = i32>,
390 F: Fn(Context, Option<u8>) -> Fut,
391 Fut: Future<Output = A>,
392 {
393 {
395 let mut archive = creator(context.clone(), compression).await;
396
397 let keys = vec![
399 (1u64, test_key("key1"), 1),
400 (2u64, test_key("key2"), 2),
401 (3u64, test_key("key3"), 3),
402 ];
403
404 for (index, key, data) in &keys {
405 archive
406 .put(*index, key.clone(), *data)
407 .await
408 .expect("Failed to put data");
409 }
410
411 archive.close().await.expect("Failed to close archive");
413 }
414
415 {
417 let archive = creator(context, compression).await;
418
419 let keys = vec![
421 (1u64, test_key("key1"), 1),
422 (2u64, test_key("key2"), 2),
423 (3u64, test_key("key3"), 3),
424 ];
425
426 for (index, key, expected_data) in &keys {
427 let retrieved = archive
428 .get(Identifier::Index(*index))
429 .await
430 .expect("Failed to get data")
431 .expect("Data not found");
432 assert_eq!(retrieved, *expected_data);
433
434 let retrieved = archive
435 .get(Identifier::Key(key))
436 .await
437 .expect("Failed to get data")
438 .expect("Data not found");
439 assert_eq!(retrieved, *expected_data);
440 }
441
442 archive.close().await.expect("Failed to close archive");
443 }
444 }
445
446 #[test_traced]
447 fn test_persistence_prunable_no_compression() {
448 let executor = deterministic::Runner::default();
449 executor.start(|context| async move {
450 test_persistence_impl(context, create_prunable, None).await;
451 });
452 }
453
454 #[test_traced]
455 fn test_persistence_prunable_compression() {
456 let executor = deterministic::Runner::default();
457 executor.start(|context| async move {
458 test_persistence_impl(context, create_prunable, Some(3)).await;
459 });
460 }
461
462 #[test_traced]
463 fn test_persistence_immutable_no_compression() {
464 let executor = deterministic::Runner::default();
465 executor.start(|context| async move {
466 test_persistence_impl(context, create_immutable, None).await;
467 });
468 }
469
470 #[test_traced]
471 fn test_persistence_immutable_compression() {
472 let executor = deterministic::Runner::default();
473 executor.start(|context| async move {
474 test_persistence_impl(context, create_immutable, Some(3)).await;
475 });
476 }
477
478 async fn test_ranges_impl<A, F, Fut>(mut context: Context, creator: F, compression: Option<u8>)
479 where
480 A: Archive<Key = FixedBytes<64>, Value = i32>,
481 F: Fn(Context, Option<u8>) -> Fut,
482 Fut: Future<Output = A>,
483 {
484 let mut keys = BTreeMap::new();
485 {
486 let mut archive = creator(context.clone(), compression).await;
487
488 let mut last_index = 0u64;
490 while keys.len() < 100 {
491 let gap: u64 = context.gen_range(1..=10);
492 let index = last_index + gap;
493 last_index = index;
494
495 let mut key_bytes = [0u8; 64];
496 context.fill(&mut key_bytes);
497 let key = FixedBytes::<64>::decode(key_bytes.as_ref()).unwrap();
498 let data: i32 = context.gen();
499
500 if keys.contains_key(&index) {
501 continue;
502 }
503 keys.insert(index, (key.clone(), data));
504
505 archive
506 .put(index, key, data)
507 .await
508 .expect("Failed to put data");
509 }
510
511 archive.close().await.expect("Failed to close archive");
512 }
513
514 {
515 let archive = creator(context, compression).await;
516 let sorted_indices: Vec<u64> = keys.keys().cloned().collect();
517
518 let (current_end, start_next) = archive.next_gap(0);
520 assert!(current_end.is_none());
521 assert_eq!(start_next, Some(sorted_indices[0]));
522
523 let mut i = 0;
525 while i < sorted_indices.len() {
526 let current_index = sorted_indices[i];
527
528 let mut j = i;
530 while j + 1 < sorted_indices.len() && sorted_indices[j + 1] == sorted_indices[j] + 1
531 {
532 j += 1;
533 }
534 let block_end_index = sorted_indices[j];
535 let next_actual_index = if j + 1 < sorted_indices.len() {
536 Some(sorted_indices[j + 1])
537 } else {
538 None
539 };
540
541 let (current_end, start_next) = archive.next_gap(current_index);
542 assert_eq!(current_end, Some(block_end_index));
543 assert_eq!(start_next, next_actual_index);
544
545 if let Some(next_index) = next_actual_index {
547 if next_index > block_end_index + 1 {
548 let in_gap_index = block_end_index + 1;
549 let (current_end, start_next) = archive.next_gap(in_gap_index);
550 assert!(current_end.is_none());
551 assert_eq!(start_next, Some(next_index));
552 }
553 }
554 i = j + 1;
555 }
556
557 let last_index = *sorted_indices.last().unwrap();
559 let (current_end, start_next) = archive.next_gap(last_index);
560 assert!(current_end.is_some());
561 assert!(start_next.is_none());
562
563 archive.close().await.expect("Failed to close archive");
564 }
565 }
566
567 #[test_traced]
568 fn test_ranges_prunable_no_compression() {
569 let executor = deterministic::Runner::default();
570 executor.start(|context| async move {
571 test_ranges_impl(context, create_prunable, None).await;
572 });
573 }
574
575 #[test_traced]
576 fn test_ranges_prunable_compression() {
577 let executor = deterministic::Runner::default();
578 executor.start(|context| async move {
579 test_ranges_impl(context, create_prunable, Some(3)).await;
580 });
581 }
582
583 #[test_traced]
584 fn test_ranges_immutable_no_compression() {
585 let executor = deterministic::Runner::default();
586 executor.start(|context| async move {
587 test_ranges_impl(context, create_immutable, None).await;
588 });
589 }
590
591 #[test_traced]
592 fn test_ranges_immutable_compression() {
593 let executor = deterministic::Runner::default();
594 executor.start(|context| async move {
595 test_ranges_impl(context, create_immutable, Some(3)).await;
596 });
597 }
598
599 async fn test_many_keys_impl<A, F, Fut>(
600 mut context: Context,
601 creator: F,
602 compression: Option<u8>,
603 num: usize,
604 ) where
605 A: Archive<Key = FixedBytes<64>, Value = i32>,
606 F: Fn(Context, Option<u8>) -> Fut,
607 Fut: Future<Output = A>,
608 {
609 let mut keys = BTreeMap::new();
611 {
612 let mut archive = creator(context.clone(), compression).await;
613 while keys.len() < num {
614 let index = keys.len() as u64;
615 let mut key = [0u8; 64];
616 context.fill(&mut key);
617 let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
618 let data: i32 = context.gen();
619
620 archive
621 .put(index, key.clone(), data)
622 .await
623 .expect("Failed to put data");
624 keys.insert(key, (index, data));
625
626 if context.gen_bool(0.1) {
628 archive.sync().await.expect("Failed to sync archive");
629 }
630 }
631 archive.sync().await.expect("Failed to sync archive");
632
633 for (key, (index, data)) in &keys {
635 let retrieved = archive
636 .get(Identifier::Index(*index))
637 .await
638 .expect("Failed to get data")
639 .expect("Data not found");
640 assert_eq!(&retrieved, data);
641 let retrieved = archive
642 .get(Identifier::Key(key))
643 .await
644 .expect("Failed to get data")
645 .expect("Data not found");
646 assert_eq!(&retrieved, data);
647 }
648
649 archive.close().await.expect("Failed to close archive");
650 }
651
652 {
654 let archive = creator(context.clone(), compression).await;
655
656 for (key, (index, data)) in &keys {
658 let retrieved = archive
659 .get(Identifier::Index(*index))
660 .await
661 .expect("Failed to get data")
662 .expect("Data not found");
663 assert_eq!(&retrieved, data);
664 let retrieved = archive
665 .get(Identifier::Key(key))
666 .await
667 .expect("Failed to get data")
668 .expect("Data not found");
669 assert_eq!(&retrieved, data);
670 }
671
672 archive.close().await.expect("Failed to close archive");
673 }
674 }
675
676 fn test_many_keys_determinism<F, Fut, A>(creator: F, compression: Option<u8>, num: usize)
677 where
678 A: Archive<Key = FixedBytes<64>, Value = i32>,
679 F: Fn(Context, Option<u8>) -> Fut + Copy + Send + 'static,
680 Fut: Future<Output = A> + Send,
681 {
682 let executor = deterministic::Runner::default();
683 let state1 = executor.start(|context| async move {
684 test_many_keys_impl(context.clone(), creator, compression, num).await;
685 context.auditor().state()
686 });
687 let executor = deterministic::Runner::default();
688 let state2 = executor.start(|context| async move {
689 test_many_keys_impl(context.clone(), creator, compression, num).await;
690 context.auditor().state()
691 });
692 assert_eq!(state1, state2);
693 }
694
695 #[test_traced]
696 fn test_many_keys_prunable_no_compression() {
697 test_many_keys_determinism(create_prunable, None, 1_000);
698 }
699
700 #[test_traced]
701 fn test_many_keys_prunable_compression() {
702 test_many_keys_determinism(create_prunable, Some(3), 1_000);
703 }
704
705 #[test_traced]
706 fn test_many_keys_immutable_no_compression() {
707 test_many_keys_determinism(create_immutable, None, 1_000);
708 }
709
710 #[test_traced]
711 fn test_many_keys_immutable_compression() {
712 test_many_keys_determinism(create_immutable, Some(3), 1_000);
713 }
714
715 #[test_traced]
716 #[ignore]
717 fn test_many_keys_prunable_large() {
718 test_many_keys_determinism(create_prunable, None, 50_000);
719 }
720
721 #[test_traced]
722 #[ignore]
723 fn test_many_keys_immutable_large() {
724 test_many_keys_determinism(create_immutable, None, 50_000);
725 }
726}