1use commonware_codec::Codec;
7use commonware_utils::Array;
8use std::future::Future;
9use thiserror::Error;
10
11pub mod immutable;
12pub mod prunable;
13
14pub enum Identifier<'a, K: Array> {
16 Index(u64),
17 Key(&'a K),
18}
19
20#[derive(Debug, Error)]
22pub enum Error {
23 #[error("journal error: {0}")]
24 Journal(#[from] crate::journal::Error),
25 #[error("ordinal error: {0}")]
26 Ordinal(#[from] crate::ordinal::Error),
27 #[error("metadata error: {0}")]
28 Metadata(#[from] crate::metadata::Error),
29 #[error("freezer error: {0}")]
30 Freezer(#[from] crate::freezer::Error),
31 #[error("record corrupted")]
32 RecordCorrupted,
33 #[error("already pruned to: {0}")]
34 AlreadyPrunedTo(u64),
35 #[error("record too large")]
36 RecordTooLarge,
37}
38
39pub trait Archive {
41 type Key: Array;
43
44 type Value: Codec;
46
47 fn put(
52 &mut self,
53 index: u64,
54 key: Self::Key,
55 value: Self::Value,
56 ) -> impl Future<Output = Result<(), Error>>;
57
58 fn put_sync(
60 &mut self,
61 index: u64,
62 key: Self::Key,
63 value: Self::Value,
64 ) -> impl Future<Output = Result<(), Error>> {
65 async move {
66 self.put(index, key, value).await?;
67 self.sync().await
68 }
69 }
70
71 fn get(
73 &self,
74 identifier: Identifier<'_, Self::Key>,
75 ) -> impl Future<Output = Result<Option<Self::Value>, Error>>;
76
77 fn has(
79 &self,
80 identifier: Identifier<'_, Self::Key>,
81 ) -> impl Future<Output = Result<bool, Error>>;
82
83 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>);
88
89 fn first_index(&self) -> Option<u64>;
91
92 fn last_index(&self) -> Option<u64>;
94
95 fn sync(&mut self) -> impl Future<Output = Result<(), Error>>;
97
98 fn close(self) -> impl Future<Output = Result<(), Error>>;
102
103 fn destroy(self) -> impl Future<Output = Result<(), Error>>;
105}
106
107#[cfg(test)]
108mod tests {
109 use super::*;
110 use crate::translator::TwoCap;
111 use commonware_codec::DecodeExt;
112 use commonware_macros::test_traced;
113 use commonware_runtime::{
114 buffer::PoolRef,
115 deterministic::{self, Context},
116 Runner,
117 };
118 use commonware_utils::{sequence::FixedBytes, NZUsize, NZU64};
119 use rand::Rng;
120 use std::{collections::BTreeMap, num::NonZeroUsize};
121
122 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
123 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
124
125 fn test_key(key: &str) -> FixedBytes<64> {
126 let mut buf = [0u8; 64];
127 let key = key.as_bytes();
128 assert!(key.len() <= buf.len());
129 buf[..key.len()].copy_from_slice(key);
130 FixedBytes::decode(buf.as_ref()).unwrap()
131 }
132
133 async fn create_prunable(
134 context: Context,
135 compression: Option<u8>,
136 ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
137 let cfg = prunable::Config {
138 partition: "test".into(),
139 translator: TwoCap,
140 compression,
141 codec_config: (),
142 items_per_section: NZU64!(1024),
143 write_buffer: NZUsize!(1024),
144 replay_buffer: NZUsize!(1024),
145 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
146 };
147 prunable::Archive::init(context, cfg).await.unwrap()
148 }
149
150 async fn create_immutable(
151 context: Context,
152 compression: Option<u8>,
153 ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
154 let cfg = immutable::Config {
155 metadata_partition: "test_metadata".into(),
156 freezer_table_partition: "test_table".into(),
157 freezer_table_initial_size: 64,
158 freezer_table_resize_frequency: 2,
159 freezer_table_resize_chunk_size: 32,
160 freezer_journal_partition: "test_journal".into(),
161 freezer_journal_target_size: 1024 * 1024,
162 freezer_journal_compression: compression,
163 freezer_journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
164 ordinal_partition: "test_ordinal".into(),
165 items_per_section: NZU64!(1024),
166 write_buffer: NZUsize!(1024 * 1024),
167 replay_buffer: NZUsize!(1024 * 1024),
168 codec_config: (),
169 };
170 immutable::Archive::init(context, cfg).await.unwrap()
171 }
172
173 async fn test_put_get_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
174 let index = 1u64;
175 let key = test_key("testkey");
176 let data = 1;
177
178 let has = archive
180 .has(Identifier::Index(index))
181 .await
182 .expect("Failed to check key");
183 assert!(!has);
184 let has = archive
185 .has(Identifier::Key(&key))
186 .await
187 .expect("Failed to check key");
188 assert!(!has);
189
190 archive
192 .put(index, key.clone(), data)
193 .await
194 .expect("Failed to put data");
195
196 let has = archive
198 .has(Identifier::Index(index))
199 .await
200 .expect("Failed to check key");
201 assert!(has);
202 let has = archive
203 .has(Identifier::Key(&key))
204 .await
205 .expect("Failed to check key");
206 assert!(has);
207
208 let retrieved = archive
210 .get(Identifier::Key(&key))
211 .await
212 .expect("Failed to get data");
213 assert_eq!(retrieved, Some(data));
214
215 let retrieved = archive
217 .get(Identifier::Index(index))
218 .await
219 .expect("Failed to get data");
220 assert_eq!(retrieved, Some(data));
221
222 archive.sync().await.expect("Failed to sync data");
224
225 archive.close().await.expect("Failed to close archive");
227 }
228
229 #[test_traced]
230 fn test_put_get_prunable_no_compression() {
231 let executor = deterministic::Runner::default();
232 executor.start(|context| async move {
233 let archive = create_prunable(context, None).await;
234 test_put_get_impl(archive).await;
235 });
236 }
237
238 #[test_traced]
239 fn test_put_get_prunable_compression() {
240 let executor = deterministic::Runner::default();
241 executor.start(|context| async move {
242 let archive = create_prunable(context, Some(3)).await;
243 test_put_get_impl(archive).await;
244 });
245 }
246
247 #[test_traced]
248 fn test_put_get_immutable_no_compression() {
249 let executor = deterministic::Runner::default();
250 executor.start(|context| async move {
251 let archive = create_immutable(context, None).await;
252 test_put_get_impl(archive).await;
253 });
254 }
255
256 #[test_traced]
257 fn test_put_get_immutable_compression() {
258 let executor = deterministic::Runner::default();
259 executor.start(|context| async move {
260 let archive = create_immutable(context, Some(3)).await;
261 test_put_get_impl(archive).await;
262 });
263 }
264
265 async fn test_duplicate_key_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
266 let index = 1u64;
267 let key = test_key("duplicate");
268 let data1 = 1;
269 let data2 = 2;
270
271 archive
273 .put(index, key.clone(), data1)
274 .await
275 .expect("Failed to put data");
276
277 archive
279 .put(index, key.clone(), data2)
280 .await
281 .expect("Duplicate put should not fail");
282
283 let retrieved = archive
285 .get(Identifier::Index(index))
286 .await
287 .expect("Failed to get data")
288 .expect("Data not found");
289 assert_eq!(retrieved, data1);
290
291 let retrieved = archive
292 .get(Identifier::Key(&key))
293 .await
294 .expect("Failed to get data")
295 .expect("Data not found");
296 assert_eq!(retrieved, data1);
297
298 archive.close().await.expect("Failed to close archive");
299 }
300
301 #[test_traced]
302 fn test_duplicate_key_prunable_no_compression() {
303 let executor = deterministic::Runner::default();
304 executor.start(|context| async move {
305 let archive = create_prunable(context, None).await;
306 test_duplicate_key_impl(archive).await;
307 });
308 }
309
310 #[test_traced]
311 fn test_duplicate_key_prunable_compression() {
312 let executor = deterministic::Runner::default();
313 executor.start(|context| async move {
314 let archive = create_prunable(context, Some(3)).await;
315 test_duplicate_key_impl(archive).await;
316 });
317 }
318
319 #[test_traced]
320 fn test_duplicate_key_immutable_no_compression() {
321 let executor = deterministic::Runner::default();
322 executor.start(|context| async move {
323 let archive = create_immutable(context, None).await;
324 test_duplicate_key_impl(archive).await;
325 });
326 }
327
328 #[test_traced]
329 fn test_duplicate_key_immutable_compression() {
330 let executor = deterministic::Runner::default();
331 executor.start(|context| async move {
332 let archive = create_immutable(context, Some(3)).await;
333 test_duplicate_key_impl(archive).await;
334 });
335 }
336
337 async fn test_get_nonexistent_impl(archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
338 let index = 1u64;
340 let retrieved: Option<i32> = archive
341 .get(Identifier::Index(index))
342 .await
343 .expect("Failed to get data");
344 assert!(retrieved.is_none());
345
346 let key = test_key("nonexistent");
348 let retrieved = archive
349 .get(Identifier::Key(&key))
350 .await
351 .expect("Failed to get data");
352 assert!(retrieved.is_none());
353
354 archive.close().await.expect("Failed to close archive");
355 }
356
357 #[test_traced]
358 fn test_get_nonexistent_prunable_no_compression() {
359 let executor = deterministic::Runner::default();
360 executor.start(|context| async move {
361 let archive = create_prunable(context, None).await;
362 test_get_nonexistent_impl(archive).await;
363 });
364 }
365
366 #[test_traced]
367 fn test_get_nonexistent_prunable_compression() {
368 let executor = deterministic::Runner::default();
369 executor.start(|context| async move {
370 let archive = create_prunable(context, Some(3)).await;
371 test_get_nonexistent_impl(archive).await;
372 });
373 }
374
375 #[test_traced]
376 fn test_get_nonexistent_immutable_no_compression() {
377 let executor = deterministic::Runner::default();
378 executor.start(|context| async move {
379 let archive = create_immutable(context, None).await;
380 test_get_nonexistent_impl(archive).await;
381 });
382 }
383
384 #[test_traced]
385 fn test_get_nonexistent_immutable_compression() {
386 let executor = deterministic::Runner::default();
387 executor.start(|context| async move {
388 let archive = create_immutable(context, Some(3)).await;
389 test_get_nonexistent_impl(archive).await;
390 });
391 }
392
393 async fn test_persistence_impl<A, F, Fut>(context: Context, creator: F, compression: Option<u8>)
394 where
395 A: Archive<Key = FixedBytes<64>, Value = i32>,
396 F: Fn(Context, Option<u8>) -> Fut,
397 Fut: Future<Output = A>,
398 {
399 {
401 let mut archive = creator(context.clone(), compression).await;
402
403 let keys = vec![
405 (1u64, test_key("key1"), 1),
406 (2u64, test_key("key2"), 2),
407 (3u64, test_key("key3"), 3),
408 ];
409
410 for (index, key, data) in &keys {
411 archive
412 .put(*index, key.clone(), *data)
413 .await
414 .expect("Failed to put data");
415 }
416
417 archive.close().await.expect("Failed to close archive");
419 }
420
421 {
423 let archive = creator(context, compression).await;
424
425 let keys = vec![
427 (1u64, test_key("key1"), 1),
428 (2u64, test_key("key2"), 2),
429 (3u64, test_key("key3"), 3),
430 ];
431
432 for (index, key, expected_data) in &keys {
433 let retrieved = archive
434 .get(Identifier::Index(*index))
435 .await
436 .expect("Failed to get data")
437 .expect("Data not found");
438 assert_eq!(retrieved, *expected_data);
439
440 let retrieved = archive
441 .get(Identifier::Key(key))
442 .await
443 .expect("Failed to get data")
444 .expect("Data not found");
445 assert_eq!(retrieved, *expected_data);
446 }
447
448 archive.close().await.expect("Failed to close archive");
449 }
450 }
451
452 #[test_traced]
453 fn test_persistence_prunable_no_compression() {
454 let executor = deterministic::Runner::default();
455 executor.start(|context| async move {
456 test_persistence_impl(context, create_prunable, None).await;
457 });
458 }
459
460 #[test_traced]
461 fn test_persistence_prunable_compression() {
462 let executor = deterministic::Runner::default();
463 executor.start(|context| async move {
464 test_persistence_impl(context, create_prunable, Some(3)).await;
465 });
466 }
467
468 #[test_traced]
469 fn test_persistence_immutable_no_compression() {
470 let executor = deterministic::Runner::default();
471 executor.start(|context| async move {
472 test_persistence_impl(context, create_immutable, None).await;
473 });
474 }
475
476 #[test_traced]
477 fn test_persistence_immutable_compression() {
478 let executor = deterministic::Runner::default();
479 executor.start(|context| async move {
480 test_persistence_impl(context, create_immutable, Some(3)).await;
481 });
482 }
483
484 async fn test_ranges_impl<A, F, Fut>(mut context: Context, creator: F, compression: Option<u8>)
485 where
486 A: Archive<Key = FixedBytes<64>, Value = i32>,
487 F: Fn(Context, Option<u8>) -> Fut,
488 Fut: Future<Output = A>,
489 {
490 let mut keys = BTreeMap::new();
491 {
492 let mut archive = creator(context.clone(), compression).await;
493
494 let mut last_index = 0u64;
496 while keys.len() < 100 {
497 let gap: u64 = context.gen_range(1..=10);
498 let index = last_index + gap;
499 last_index = index;
500
501 let mut key_bytes = [0u8; 64];
502 context.fill(&mut key_bytes);
503 let key = FixedBytes::<64>::decode(key_bytes.as_ref()).unwrap();
504 let data: i32 = context.gen();
505
506 if keys.contains_key(&index) {
507 continue;
508 }
509 keys.insert(index, (key.clone(), data));
510
511 archive
512 .put(index, key, data)
513 .await
514 .expect("Failed to put data");
515 }
516
517 archive.close().await.expect("Failed to close archive");
518 }
519
520 {
521 let archive = creator(context, compression).await;
522 let sorted_indices: Vec<u64> = keys.keys().cloned().collect();
523
524 let (current_end, start_next) = archive.next_gap(0);
526 assert!(current_end.is_none());
527 assert_eq!(start_next, Some(sorted_indices[0]));
528
529 let mut i = 0;
531 while i < sorted_indices.len() {
532 let current_index = sorted_indices[i];
533
534 let mut j = i;
536 while j + 1 < sorted_indices.len() && sorted_indices[j + 1] == sorted_indices[j] + 1
537 {
538 j += 1;
539 }
540 let block_end_index = sorted_indices[j];
541 let next_actual_index = if j + 1 < sorted_indices.len() {
542 Some(sorted_indices[j + 1])
543 } else {
544 None
545 };
546
547 let (current_end, start_next) = archive.next_gap(current_index);
548 assert_eq!(current_end, Some(block_end_index));
549 assert_eq!(start_next, next_actual_index);
550
551 if let Some(next_index) = next_actual_index {
553 if next_index > block_end_index + 1 {
554 let in_gap_index = block_end_index + 1;
555 let (current_end, start_next) = archive.next_gap(in_gap_index);
556 assert!(current_end.is_none());
557 assert_eq!(start_next, Some(next_index));
558 }
559 }
560 i = j + 1;
561 }
562
563 let last_index = *sorted_indices.last().unwrap();
565 let (current_end, start_next) = archive.next_gap(last_index);
566 assert!(current_end.is_some());
567 assert!(start_next.is_none());
568
569 archive.close().await.expect("Failed to close archive");
570 }
571 }
572
573 #[test_traced]
574 fn test_ranges_prunable_no_compression() {
575 let executor = deterministic::Runner::default();
576 executor.start(|context| async move {
577 test_ranges_impl(context, create_prunable, None).await;
578 });
579 }
580
581 #[test_traced]
582 fn test_ranges_prunable_compression() {
583 let executor = deterministic::Runner::default();
584 executor.start(|context| async move {
585 test_ranges_impl(context, create_prunable, Some(3)).await;
586 });
587 }
588
589 #[test_traced]
590 fn test_ranges_immutable_no_compression() {
591 let executor = deterministic::Runner::default();
592 executor.start(|context| async move {
593 test_ranges_impl(context, create_immutable, None).await;
594 });
595 }
596
597 #[test_traced]
598 fn test_ranges_immutable_compression() {
599 let executor = deterministic::Runner::default();
600 executor.start(|context| async move {
601 test_ranges_impl(context, create_immutable, Some(3)).await;
602 });
603 }
604
605 async fn test_many_keys_impl<A, F, Fut>(
606 mut context: Context,
607 creator: F,
608 compression: Option<u8>,
609 num: usize,
610 ) where
611 A: Archive<Key = FixedBytes<64>, Value = i32>,
612 F: Fn(Context, Option<u8>) -> Fut,
613 Fut: Future<Output = A>,
614 {
615 let mut keys = BTreeMap::new();
617 {
618 let mut archive = creator(context.clone(), compression).await;
619 while keys.len() < num {
620 let index = keys.len() as u64;
621 let mut key = [0u8; 64];
622 context.fill(&mut key);
623 let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
624 let data: i32 = context.gen();
625
626 archive
627 .put(index, key.clone(), data)
628 .await
629 .expect("Failed to put data");
630 keys.insert(key, (index, data));
631
632 if context.gen_bool(0.1) {
634 archive.sync().await.expect("Failed to sync archive");
635 }
636 }
637 archive.sync().await.expect("Failed to sync archive");
638
639 for (key, (index, data)) in &keys {
641 let retrieved = archive
642 .get(Identifier::Index(*index))
643 .await
644 .expect("Failed to get data")
645 .expect("Data not found");
646 assert_eq!(&retrieved, data);
647 let retrieved = archive
648 .get(Identifier::Key(key))
649 .await
650 .expect("Failed to get data")
651 .expect("Data not found");
652 assert_eq!(&retrieved, data);
653 }
654
655 archive.close().await.expect("Failed to close archive");
656 }
657
658 {
660 let archive = creator(context.clone(), compression).await;
661
662 for (key, (index, data)) in &keys {
664 let retrieved = archive
665 .get(Identifier::Index(*index))
666 .await
667 .expect("Failed to get data")
668 .expect("Data not found");
669 assert_eq!(&retrieved, data);
670 let retrieved = archive
671 .get(Identifier::Key(key))
672 .await
673 .expect("Failed to get data")
674 .expect("Data not found");
675 assert_eq!(&retrieved, data);
676 }
677
678 archive.close().await.expect("Failed to close archive");
679 }
680 }
681
682 fn test_many_keys_determinism<F, Fut, A>(creator: F, compression: Option<u8>, num: usize)
683 where
684 A: Archive<Key = FixedBytes<64>, Value = i32>,
685 F: Fn(Context, Option<u8>) -> Fut + Copy + Send + 'static,
686 Fut: Future<Output = A> + Send,
687 {
688 let executor = deterministic::Runner::default();
689 let state1 = executor.start(|context| async move {
690 test_many_keys_impl(context.clone(), creator, compression, num).await;
691 context.auditor().state()
692 });
693 let executor = deterministic::Runner::default();
694 let state2 = executor.start(|context| async move {
695 test_many_keys_impl(context.clone(), creator, compression, num).await;
696 context.auditor().state()
697 });
698 assert_eq!(state1, state2);
699 }
700
701 #[test_traced]
702 fn test_many_keys_prunable_no_compression() {
703 test_many_keys_determinism(create_prunable, None, 1_000);
704 }
705
706 #[test_traced]
707 fn test_many_keys_prunable_compression() {
708 test_many_keys_determinism(create_prunable, Some(3), 1_000);
709 }
710
711 #[test_traced]
712 fn test_many_keys_immutable_no_compression() {
713 test_many_keys_determinism(create_immutable, None, 1_000);
714 }
715
716 #[test_traced]
717 fn test_many_keys_immutable_compression() {
718 test_many_keys_determinism(create_immutable, Some(3), 1_000);
719 }
720
721 #[test_traced]
722 #[ignore]
723 fn test_many_keys_prunable_large() {
724 test_many_keys_determinism(create_prunable, None, 50_000);
725 }
726
727 #[test_traced]
728 #[ignore]
729 fn test_many_keys_immutable_large() {
730 test_many_keys_determinism(create_immutable, None, 50_000);
731 }
732}