1use commonware_codec::Codec;
7use commonware_utils::Array;
8use std::future::Future;
9use thiserror::Error;
10
11pub mod immutable;
12pub mod prunable;
13
14#[cfg(all(test, feature = "arbitrary"))]
15mod conformance;
16
17pub enum Identifier<'a, K: Array> {
19 Index(u64),
20 Key(&'a K),
21}
22
23#[derive(Debug, Error)]
25pub enum Error {
26 #[error("journal error: {0}")]
27 Journal(#[from] crate::journal::Error),
28 #[error("ordinal error: {0}")]
29 Ordinal(#[from] crate::ordinal::Error),
30 #[error("metadata error: {0}")]
31 Metadata(#[from] crate::metadata::Error),
32 #[error("freezer error: {0}")]
33 Freezer(#[from] crate::freezer::Error),
34 #[error("record corrupted")]
35 RecordCorrupted,
36 #[error("already pruned to: {0}")]
37 AlreadyPrunedTo(u64),
38 #[error("record too large")]
39 RecordTooLarge,
40}
41
42pub trait Archive: Send {
44 type Key: Array;
46
47 type Value: Codec + Send;
49
50 fn put(
55 &mut self,
56 index: u64,
57 key: Self::Key,
58 value: Self::Value,
59 ) -> impl Future<Output = Result<(), Error>> + Send;
60
61 fn put_sync(
63 &mut self,
64 index: u64,
65 key: Self::Key,
66 value: Self::Value,
67 ) -> impl Future<Output = Result<(), Error>> + Send {
68 async move {
69 self.put(index, key, value).await?;
70 self.sync().await
71 }
72 }
73
74 fn get<'a>(
80 &'a self,
81 identifier: Identifier<'a, Self::Key>,
82 ) -> impl Future<Output = Result<Option<Self::Value>, Error>> + Send + use<'a, Self>;
83
84 fn has<'a>(
86 &'a self,
87 identifier: Identifier<'a, Self::Key>,
88 ) -> impl Future<Output = Result<bool, Error>> + Send + use<'a, Self>;
89
90 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>);
95
96 fn missing_items(&self, index: u64, max: usize) -> Vec<u64>;
101
102 fn ranges(&self) -> impl Iterator<Item = (u64, u64)>;
104
105 fn ranges_from(&self, from: u64) -> impl Iterator<Item = (u64, u64)>;
107
108 fn first_index(&self) -> Option<u64>;
110
111 fn last_index(&self) -> Option<u64>;
113
114 fn sync(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
116
117 fn destroy(self) -> impl Future<Output = Result<(), Error>> + Send;
119}
120
121pub trait MultiArchive: Archive {
128 fn get_all(
132 &self,
133 index: u64,
134 ) -> impl Future<Output = Result<Option<Vec<Self::Value>>, Error>> + Send + use<'_, Self>;
135
136 fn put_multi(
142 &mut self,
143 index: u64,
144 key: Self::Key,
145 value: Self::Value,
146 ) -> impl Future<Output = Result<(), Error>> + Send;
147
148 fn put_multi_sync(
150 &mut self,
151 index: u64,
152 key: Self::Key,
153 value: Self::Value,
154 ) -> impl Future<Output = Result<(), Error>> + Send {
155 async move {
156 self.put_multi(index, key, value).await?;
157 self.sync().await
158 }
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use crate::translator::TwoCap;
166 use commonware_codec::DecodeExt;
167 use commonware_macros::{test_group, test_traced};
168 use commonware_runtime::{
169 buffer::paged::CacheRef,
170 deterministic::{self, Context},
171 Metrics, Runner,
172 };
173 use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
174
175 fn test_key(key: &str) -> FixedBytes<64> {
176 let mut buf = [0u8; 64];
177 let key = key.as_bytes();
178 assert!(key.len() <= buf.len());
179 buf[..key.len()].copy_from_slice(key);
180 FixedBytes::decode(buf.as_ref()).unwrap()
181 }
182 use rand::Rng;
183 use std::{
184 collections::BTreeMap,
185 num::{NonZeroU16, NonZeroUsize},
186 };
187
188 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
189 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
190
191 async fn create_prunable(
192 context: Context,
193 compression: Option<u8>,
194 ) -> impl MultiArchive<Key = FixedBytes<64>, Value = i32> {
195 let cfg = prunable::Config {
196 translator: TwoCap,
197 key_partition: "test-key".into(),
198 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
199 value_partition: "test-value".into(),
200 compression,
201 codec_config: (),
202 items_per_section: NZU64!(1024),
203 key_write_buffer: NZUsize!(1024),
204 value_write_buffer: NZUsize!(1024),
205 replay_buffer: NZUsize!(1024),
206 };
207 prunable::Archive::init(context, cfg).await.unwrap()
208 }
209
210 async fn create_immutable(
211 context: Context,
212 compression: Option<u8>,
213 ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
214 let cfg = immutable::Config {
215 metadata_partition: "test-metadata".into(),
216 freezer_table_partition: "test-table".into(),
217 freezer_table_initial_size: 64,
218 freezer_table_resize_frequency: 2,
219 freezer_table_resize_chunk_size: 32,
220 freezer_key_partition: "test-key".into(),
221 freezer_key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
222 freezer_value_partition: "test-value".into(),
223 freezer_value_target_size: 1024 * 1024,
224 freezer_value_compression: compression,
225 ordinal_partition: "test-ordinal".into(),
226 items_per_section: NZU64!(1024),
227 freezer_key_write_buffer: NZUsize!(1024 * 1024),
228 freezer_value_write_buffer: NZUsize!(1024 * 1024),
229 ordinal_write_buffer: NZUsize!(1024 * 1024),
230 replay_buffer: NZUsize!(1024 * 1024),
231 codec_config: (),
232 };
233 immutable::Archive::init(context, cfg).await.unwrap()
234 }
235
236 async fn test_put_get_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
237 let index = 1u64;
238 let key = test_key("testkey");
239 let data = 1;
240
241 let has = archive
243 .has(Identifier::Index(index))
244 .await
245 .expect("Failed to check key");
246 assert!(!has);
247 let has = archive
248 .has(Identifier::Key(&key))
249 .await
250 .expect("Failed to check key");
251 assert!(!has);
252
253 archive
255 .put(index, key.clone(), data)
256 .await
257 .expect("Failed to put data");
258
259 let has = archive
261 .has(Identifier::Index(index))
262 .await
263 .expect("Failed to check key");
264 assert!(has);
265 let has = archive
266 .has(Identifier::Key(&key))
267 .await
268 .expect("Failed to check key");
269 assert!(has);
270
271 let retrieved = archive
273 .get(Identifier::Key(&key))
274 .await
275 .expect("Failed to get data");
276 assert_eq!(retrieved, Some(data));
277
278 let retrieved = archive
280 .get(Identifier::Index(index))
281 .await
282 .expect("Failed to get data");
283 assert_eq!(retrieved, Some(data));
284
285 archive.sync().await.expect("Failed to sync data");
287 }
288
289 #[test_traced]
290 fn test_put_get_prunable_no_compression() {
291 let executor = deterministic::Runner::default();
292 executor.start(|context| async move {
293 let archive = create_prunable(context, None).await;
294 test_put_get_impl(archive).await;
295 });
296 }
297
298 #[test_traced]
299 fn test_put_get_prunable_compression() {
300 let executor = deterministic::Runner::default();
301 executor.start(|context| async move {
302 let archive = create_prunable(context, Some(3)).await;
303 test_put_get_impl(archive).await;
304 });
305 }
306
307 #[test_traced]
308 fn test_put_get_immutable_no_compression() {
309 let executor = deterministic::Runner::default();
310 executor.start(|context| async move {
311 let archive = create_immutable(context, None).await;
312 test_put_get_impl(archive).await;
313 });
314 }
315
316 #[test_traced]
317 fn test_put_get_immutable_compression() {
318 let executor = deterministic::Runner::default();
319 executor.start(|context| async move {
320 let archive = create_immutable(context, Some(3)).await;
321 test_put_get_impl(archive).await;
322 });
323 }
324
325 async fn test_duplicate_key_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
326 let index = 1u64;
327 let key = test_key("duplicate");
328 let data1 = 1;
329 let data2 = 2;
330
331 archive
333 .put(index, key.clone(), data1)
334 .await
335 .expect("Failed to put data");
336
337 archive
339 .put(index, key.clone(), data2)
340 .await
341 .expect("Duplicate put should not fail");
342
343 let retrieved = archive
345 .get(Identifier::Index(index))
346 .await
347 .expect("Failed to get data")
348 .expect("Data not found");
349 assert_eq!(retrieved, data1);
350
351 let retrieved = archive
352 .get(Identifier::Key(&key))
353 .await
354 .expect("Failed to get data")
355 .expect("Data not found");
356 assert_eq!(retrieved, data1);
357 }
358
359 #[test_traced]
360 fn test_duplicate_key_prunable_no_compression() {
361 let executor = deterministic::Runner::default();
362 executor.start(|context| async move {
363 let archive = create_prunable(context, None).await;
364 test_duplicate_key_impl(archive).await;
365 });
366 }
367
368 #[test_traced]
369 fn test_duplicate_key_prunable_compression() {
370 let executor = deterministic::Runner::default();
371 executor.start(|context| async move {
372 let archive = create_prunable(context, Some(3)).await;
373 test_duplicate_key_impl(archive).await;
374 });
375 }
376
377 #[test_traced]
378 fn test_duplicate_key_immutable_no_compression() {
379 let executor = deterministic::Runner::default();
380 executor.start(|context| async move {
381 let archive = create_immutable(context, None).await;
382 test_duplicate_key_impl(archive).await;
383 });
384 }
385
386 #[test_traced]
387 fn test_duplicate_key_immutable_compression() {
388 let executor = deterministic::Runner::default();
389 executor.start(|context| async move {
390 let archive = create_immutable(context, Some(3)).await;
391 test_duplicate_key_impl(archive).await;
392 });
393 }
394
395 async fn test_get_nonexistent_impl(archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
396 let index = 1u64;
398 let retrieved: Option<i32> = archive
399 .get(Identifier::Index(index))
400 .await
401 .expect("Failed to get data");
402 assert!(retrieved.is_none());
403
404 let key = test_key("nonexistent");
406 let retrieved = archive
407 .get(Identifier::Key(&key))
408 .await
409 .expect("Failed to get data");
410 assert!(retrieved.is_none());
411 }
412
413 #[test_traced]
414 fn test_get_nonexistent_prunable_no_compression() {
415 let executor = deterministic::Runner::default();
416 executor.start(|context| async move {
417 let archive = create_prunable(context, None).await;
418 test_get_nonexistent_impl(archive).await;
419 });
420 }
421
422 #[test_traced]
423 fn test_get_nonexistent_prunable_compression() {
424 let executor = deterministic::Runner::default();
425 executor.start(|context| async move {
426 let archive = create_prunable(context, Some(3)).await;
427 test_get_nonexistent_impl(archive).await;
428 });
429 }
430
431 #[test_traced]
432 fn test_get_nonexistent_immutable_no_compression() {
433 let executor = deterministic::Runner::default();
434 executor.start(|context| async move {
435 let archive = create_immutable(context, None).await;
436 test_get_nonexistent_impl(archive).await;
437 });
438 }
439
440 #[test_traced]
441 fn test_get_nonexistent_immutable_compression() {
442 let executor = deterministic::Runner::default();
443 executor.start(|context| async move {
444 let archive = create_immutable(context, Some(3)).await;
445 test_get_nonexistent_impl(archive).await;
446 });
447 }
448
449 async fn test_persistence_impl<A, F, Fut>(context: Context, creator: F, compression: Option<u8>)
450 where
451 A: Archive<Key = FixedBytes<64>, Value = i32>,
452 F: Fn(Context, Option<u8>) -> Fut,
453 Fut: Future<Output = A>,
454 {
455 {
457 let mut archive = creator(context.with_label("first"), compression).await;
458
459 let keys = vec![
461 (1u64, test_key("key1"), 1),
462 (2u64, test_key("key2"), 2),
463 (3u64, test_key("key3"), 3),
464 ];
465
466 for (index, key, data) in &keys {
467 archive
468 .put(*index, key.clone(), *data)
469 .await
470 .expect("Failed to put data");
471 }
472
473 archive.sync().await.expect("Failed to sync archive");
475 }
476
477 {
479 let archive = creator(context.with_label("second"), compression).await;
480
481 let keys = vec![
483 (1u64, test_key("key1"), 1),
484 (2u64, test_key("key2"), 2),
485 (3u64, test_key("key3"), 3),
486 ];
487
488 for (index, key, expected_data) in &keys {
489 let retrieved = archive
490 .get(Identifier::Index(*index))
491 .await
492 .expect("Failed to get data")
493 .expect("Data not found");
494 assert_eq!(retrieved, *expected_data);
495
496 let retrieved = archive
497 .get(Identifier::Key(key))
498 .await
499 .expect("Failed to get data")
500 .expect("Data not found");
501 assert_eq!(retrieved, *expected_data);
502 }
503 }
504 }
505
506 #[test_traced]
507 fn test_persistence_prunable_no_compression() {
508 let executor = deterministic::Runner::default();
509 executor.start(|context| async move {
510 test_persistence_impl(context, create_prunable, None).await;
511 });
512 }
513
514 #[test_traced]
515 fn test_persistence_prunable_compression() {
516 let executor = deterministic::Runner::default();
517 executor.start(|context| async move {
518 test_persistence_impl(context, create_prunable, Some(3)).await;
519 });
520 }
521
522 #[test_traced]
523 fn test_persistence_immutable_no_compression() {
524 let executor = deterministic::Runner::default();
525 executor.start(|context| async move {
526 test_persistence_impl(context, create_immutable, None).await;
527 });
528 }
529
530 #[test_traced]
531 fn test_persistence_immutable_compression() {
532 let executor = deterministic::Runner::default();
533 executor.start(|context| async move {
534 test_persistence_impl(context, create_immutable, Some(3)).await;
535 });
536 }
537
538 async fn test_ranges_impl<A, F, Fut>(mut context: Context, creator: F, compression: Option<u8>)
539 where
540 A: Archive<Key = FixedBytes<64>, Value = i32>,
541 F: Fn(Context, Option<u8>) -> Fut,
542 Fut: Future<Output = A>,
543 {
544 let mut keys = BTreeMap::new();
545 {
546 let mut archive = creator(context.with_label("first"), compression).await;
547
548 let mut last_index = 0u64;
550 while keys.len() < 100 {
551 let gap: u64 = context.gen_range(1..=10);
552 let index = last_index + gap;
553 last_index = index;
554
555 let mut key_bytes = [0u8; 64];
556 context.fill(&mut key_bytes);
557 let key = FixedBytes::<64>::decode(key_bytes.as_ref()).unwrap();
558 let data: i32 = context.gen();
559
560 if keys.contains_key(&index) {
561 continue;
562 }
563 keys.insert(index, (key.clone(), data));
564
565 archive
566 .put(index, key, data)
567 .await
568 .expect("Failed to put data");
569 }
570
571 archive.sync().await.expect("Failed to sync archive");
572 }
573
574 {
575 let archive = creator(context.with_label("second"), compression).await;
576 let sorted_indices: Vec<u64> = keys.keys().cloned().collect();
577
578 let (current_end, start_next) = archive.next_gap(0);
580 assert!(current_end.is_none());
581 assert_eq!(start_next, Some(sorted_indices[0]));
582
583 let mut i = 0;
585 while i < sorted_indices.len() {
586 let current_index = sorted_indices[i];
587
588 let mut j = i;
590 while j + 1 < sorted_indices.len() && sorted_indices[j + 1] == sorted_indices[j] + 1
591 {
592 j += 1;
593 }
594 let block_end_index = sorted_indices[j];
595 let next_actual_index = if j + 1 < sorted_indices.len() {
596 Some(sorted_indices[j + 1])
597 } else {
598 None
599 };
600
601 let (current_end, start_next) = archive.next_gap(current_index);
602 assert_eq!(current_end, Some(block_end_index));
603 assert_eq!(start_next, next_actual_index);
604
605 if let Some(next_index) = next_actual_index {
607 if next_index > block_end_index + 1 {
608 let in_gap_index = block_end_index + 1;
609 let (current_end, start_next) = archive.next_gap(in_gap_index);
610 assert!(current_end.is_none());
611 assert_eq!(start_next, Some(next_index));
612 }
613 }
614 i = j + 1;
615 }
616
617 let last_index = *sorted_indices.last().unwrap();
619 let (current_end, start_next) = archive.next_gap(last_index);
620 assert!(current_end.is_some());
621 assert!(start_next.is_none());
622 }
623 }
624
625 #[test_traced]
626 fn test_ranges_prunable_no_compression() {
627 let executor = deterministic::Runner::default();
628 executor.start(|context| async move {
629 test_ranges_impl(context, create_prunable, None).await;
630 });
631 }
632
633 #[test_traced]
634 fn test_ranges_prunable_compression() {
635 let executor = deterministic::Runner::default();
636 executor.start(|context| async move {
637 test_ranges_impl(context, create_prunable, Some(3)).await;
638 });
639 }
640
641 #[test_traced]
642 fn test_ranges_immutable_no_compression() {
643 let executor = deterministic::Runner::default();
644 executor.start(|context| async move {
645 test_ranges_impl(context, create_immutable, None).await;
646 });
647 }
648
649 #[test_traced]
650 fn test_ranges_immutable_compression() {
651 let executor = deterministic::Runner::default();
652 executor.start(|context| async move {
653 test_ranges_impl(context, create_immutable, Some(3)).await;
654 });
655 }
656
657 async fn test_many_keys_impl<A, F, Fut>(
658 mut context: Context,
659 creator: F,
660 compression: Option<u8>,
661 num: usize,
662 ) where
663 A: Archive<Key = FixedBytes<64>, Value = i32>,
664 F: Fn(Context, Option<u8>) -> Fut,
665 Fut: Future<Output = A>,
666 {
667 let mut keys = BTreeMap::new();
669 {
670 let mut archive = creator(context.with_label("first"), compression).await;
671 while keys.len() < num {
672 let index = keys.len() as u64;
673 let mut key = [0u8; 64];
674 context.fill(&mut key);
675 let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
676 let data: i32 = context.gen();
677
678 archive
679 .put(index, key.clone(), data)
680 .await
681 .expect("Failed to put data");
682 keys.insert(key, (index, data));
683
684 if context.gen_bool(0.1) {
686 archive.sync().await.expect("Failed to sync archive");
687 }
688 }
689 archive.sync().await.expect("Failed to sync archive");
690
691 for (key, (index, data)) in &keys {
693 let retrieved = archive
694 .get(Identifier::Index(*index))
695 .await
696 .expect("Failed to get data")
697 .expect("Data not found");
698 assert_eq!(&retrieved, data);
699 let retrieved = archive
700 .get(Identifier::Key(key))
701 .await
702 .expect("Failed to get data")
703 .expect("Data not found");
704 assert_eq!(&retrieved, data);
705 }
706 }
707
708 {
710 let archive = creator(context.with_label("second"), compression).await;
711
712 for (key, (index, data)) in &keys {
714 let retrieved = archive
715 .get(Identifier::Index(*index))
716 .await
717 .expect("Failed to get data")
718 .expect("Data not found");
719 assert_eq!(&retrieved, data);
720 let retrieved = archive
721 .get(Identifier::Key(key))
722 .await
723 .expect("Failed to get data")
724 .expect("Data not found");
725 assert_eq!(&retrieved, data);
726 }
727 }
728 }
729
730 fn test_many_keys_determinism<F, Fut, A>(creator: F, compression: Option<u8>, num: usize)
731 where
732 A: Archive<Key = FixedBytes<64>, Value = i32>,
733 F: Fn(Context, Option<u8>) -> Fut + Copy + Send + 'static,
734 Fut: Future<Output = A> + Send,
735 {
736 let executor = deterministic::Runner::default();
737 let state1 = executor.start(|context| async move {
738 test_many_keys_impl(context.clone(), creator, compression, num).await;
739 context.auditor().state()
740 });
741 let executor = deterministic::Runner::default();
742 let state2 = executor.start(|context| async move {
743 test_many_keys_impl(context.clone(), creator, compression, num).await;
744 context.auditor().state()
745 });
746 assert_eq!(state1, state2);
747 }
748
749 #[test_traced]
750 fn test_many_keys_prunable_no_compression() {
751 test_many_keys_determinism(create_prunable, None, 1_000);
752 }
753
754 #[test_traced]
755 fn test_many_keys_prunable_compression() {
756 test_many_keys_determinism(create_prunable, Some(3), 1_000);
757 }
758
759 #[test_traced]
760 fn test_many_keys_immutable_no_compression() {
761 test_many_keys_determinism(create_immutable, None, 1_000);
762 }
763
764 #[test_traced]
765 fn test_many_keys_immutable_compression() {
766 test_many_keys_determinism(create_immutable, Some(3), 1_000);
767 }
768
769 #[test_group("slow")]
770 #[test_traced]
771 fn test_many_keys_prunable_large() {
772 test_many_keys_determinism(create_prunable, None, 50_000);
773 }
774
775 #[test_group("slow")]
776 #[test_traced]
777 fn test_many_keys_immutable_large() {
778 test_many_keys_determinism(create_immutable, None, 50_000);
779 }
780
781 async fn test_put_multi_and_get_impl(
782 context: Context,
783 mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
784 ) {
785 let index = 5u64;
787 let key_a = test_key("aaa");
788 let key_b = test_key("bbb");
789 let key_c = test_key("ccc");
790
791 archive
792 .put_multi(index, key_a.clone(), 10)
793 .await
794 .expect("put_multi a");
795 archive
796 .put_multi(index, key_b.clone(), 20)
797 .await
798 .expect("put_multi b");
799 archive
800 .put_multi(index, key_c.clone(), 30)
801 .await
802 .expect("put_multi c");
803
804 assert_eq!(
806 archive.get(Identifier::Key(&key_a)).await.unwrap(),
807 Some(10)
808 );
809 assert_eq!(
810 archive.get(Identifier::Key(&key_b)).await.unwrap(),
811 Some(20)
812 );
813 assert_eq!(
814 archive.get(Identifier::Key(&key_c)).await.unwrap(),
815 Some(30)
816 );
817
818 let missing = test_key("zzz");
820 assert_eq!(archive.get(Identifier::Key(&missing)).await.unwrap(), None);
821
822 let buffer = context.encode();
824 assert!(buffer.contains("items_tracked 1"));
825 }
826
827 #[test_traced]
828 fn test_put_multi_and_get_prunable() {
829 let executor = deterministic::Runner::default();
830 executor.start(|context| async move {
831 let archive = create_prunable(context.clone(), None).await;
832 test_put_multi_and_get_impl(context, archive).await;
833 });
834 }
835
836 async fn test_put_multi_duplicate_key_impl(
837 context: Context,
838 mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
839 ) {
840 let key = test_key("dup");
841 archive.put_multi(5, key.clone(), 10).await.unwrap();
842 archive.put_multi(7, key.clone(), 20).await.unwrap();
843
844 assert_eq!(archive.get(Identifier::Index(5)).await.unwrap(), Some(10));
846 assert_eq!(archive.get(Identifier::Index(7)).await.unwrap(), Some(20));
847 assert_eq!(archive.get_all(5).await.unwrap(), Some(vec![10]));
848 assert_eq!(archive.get_all(7).await.unwrap(), Some(vec![20]));
849
850 assert!(matches!(
852 archive.get(Identifier::Key(&key)).await.unwrap(),
853 Some(10 | 20)
854 ));
855
856 let buffer = context.encode();
857 assert!(buffer.contains("items_tracked 2"));
858 }
859
860 #[test_traced]
861 fn test_put_multi_duplicate_key_prunable() {
862 let executor = deterministic::Runner::default();
863 executor.start(|context| async move {
864 let archive = create_prunable(context.clone(), None).await;
865 test_put_multi_duplicate_key_impl(context, archive).await;
866 });
867 }
868
869 async fn test_get_all_impl(mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>) {
870 archive.put_multi(5, test_key("aaa"), 10).await.unwrap();
872 archive.put_multi(5, test_key("bbb"), 20).await.unwrap();
873 archive.put_multi(5, test_key("ccc"), 30).await.unwrap();
874
875 archive.put_multi(7, test_key("ddd"), 40).await.unwrap();
877
878 let all = archive.get_all(5).await.unwrap();
880 assert_eq!(all, Some(vec![10, 20, 30]));
881
882 let all = archive.get_all(7).await.unwrap();
884 assert_eq!(all, Some(vec![40]));
885
886 let all = archive.get_all(99).await.unwrap();
888 assert_eq!(all, None);
889
890 assert_eq!(archive.get(Identifier::Index(5)).await.unwrap(), Some(10));
892 }
893
894 #[test_traced]
895 fn test_get_all_prunable() {
896 let executor = deterministic::Runner::default();
897 executor.start(|context| async move {
898 let archive = create_prunable(context, None).await;
899 test_get_all_impl(archive).await;
900 });
901 }
902
903 async fn test_put_multi_preserves_archive_put_semantics_impl(
904 mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
905 ) {
906 archive
908 .put_multi(1, test_key("aaa"), 10)
909 .await
910 .expect("put_multi");
911 archive
912 .put_multi(1, test_key("bbb"), 20)
913 .await
914 .expect("put_multi");
915
916 archive
918 .put(1, test_key("ccc"), 30)
919 .await
920 .expect("Archive::put should no-op");
921
922 assert_eq!(
924 archive
925 .get(Identifier::Key(&test_key("aaa")))
926 .await
927 .unwrap(),
928 Some(10)
929 );
930 assert_eq!(
931 archive
932 .get(Identifier::Key(&test_key("bbb")))
933 .await
934 .unwrap(),
935 Some(20)
936 );
937 assert_eq!(
938 archive
939 .get(Identifier::Key(&test_key("ccc")))
940 .await
941 .unwrap(),
942 None
943 );
944
945 let first = archive
947 .get(Identifier::Index(1))
948 .await
949 .unwrap()
950 .expect("should find first");
951 assert_eq!(first, 10);
952 }
953
954 #[test_traced]
955 fn test_put_multi_preserves_archive_put_semantics_prunable() {
956 let executor = deterministic::Runner::default();
957 executor.start(|context| async move {
958 let archive = create_prunable(context, None).await;
959 test_put_multi_preserves_archive_put_semantics_impl(archive).await;
960 });
961 }
962
963 async fn test_put_multi_restart_impl<A, F, Fut>(
964 context: Context,
965 creator: F,
966 compression: Option<u8>,
967 ) where
968 A: MultiArchive<Key = FixedBytes<64>, Value = i32>,
969 F: Fn(Context, Option<u8>) -> Fut,
970 Fut: Future<Output = A>,
971 {
972 {
974 let mut archive = creator(context.with_label("init1"), compression).await;
975 archive.put_multi(5, test_key("aaa"), 10).await.unwrap();
976 archive.put_multi(5, test_key("bbb"), 20).await.unwrap();
977 archive.put_multi(7, test_key("ccc"), 30).await.unwrap();
978 archive.sync().await.unwrap();
979 }
980
981 let archive = creator(context.with_label("init2"), compression).await;
983
984 assert_eq!(
985 archive
986 .get(Identifier::Key(&test_key("aaa")))
987 .await
988 .unwrap(),
989 Some(10)
990 );
991 assert_eq!(
992 archive
993 .get(Identifier::Key(&test_key("bbb")))
994 .await
995 .unwrap(),
996 Some(20)
997 );
998 assert_eq!(
999 archive
1000 .get(Identifier::Key(&test_key("ccc")))
1001 .await
1002 .unwrap(),
1003 Some(30)
1004 );
1005
1006 let buffer = context.encode();
1008 assert!(buffer.contains("items_tracked 2"));
1009 }
1010
1011 #[test_traced]
1012 fn test_put_multi_restart_prunable() {
1013 let executor = deterministic::Runner::default();
1014 executor.start(|context| async move {
1015 test_put_multi_restart_impl(context, create_prunable, None).await;
1016 });
1017 }
1018
1019 async fn test_put_multi_mixed_indices_impl(
1020 context: Context,
1021 mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
1022 ) {
1023 archive.put(1, test_key("single"), 100).await.unwrap();
1025 archive
1026 .put_multi(2, test_key("multi-a"), 200)
1027 .await
1028 .unwrap();
1029 archive
1030 .put_multi(2, test_key("multi-b"), 201)
1031 .await
1032 .unwrap();
1033 archive
1034 .put_multi(3, test_key("multi-c"), 300)
1035 .await
1036 .unwrap();
1037
1038 assert_eq!(
1040 archive
1041 .get(Identifier::Key(&test_key("single")))
1042 .await
1043 .unwrap(),
1044 Some(100)
1045 );
1046 assert_eq!(
1047 archive
1048 .get(Identifier::Key(&test_key("multi-a")))
1049 .await
1050 .unwrap(),
1051 Some(200)
1052 );
1053 assert_eq!(
1054 archive
1055 .get(Identifier::Key(&test_key("multi-b")))
1056 .await
1057 .unwrap(),
1058 Some(201)
1059 );
1060 assert_eq!(
1061 archive
1062 .get(Identifier::Key(&test_key("multi-c")))
1063 .await
1064 .unwrap(),
1065 Some(300)
1066 );
1067
1068 assert_eq!(archive.get(Identifier::Index(2)).await.unwrap(), Some(200));
1070
1071 let (end, next) = archive.next_gap(1);
1073 assert_eq!(end, Some(3));
1074 assert!(next.is_none());
1075
1076 let buffer = context.encode();
1077 assert!(buffer.contains("items_tracked 3"));
1078 }
1079
1080 #[test_traced]
1081 fn test_put_multi_mixed_indices_prunable() {
1082 let executor = deterministic::Runner::default();
1083 executor.start(|context| async move {
1084 let archive = create_prunable(context.clone(), None).await;
1085 test_put_multi_mixed_indices_impl(context, archive).await;
1086 });
1087 }
1088
1089 fn assert_send<T: Send>(_: T) {}
1090
1091 #[allow(dead_code)]
1092 fn assert_archive_futures_are_send<T: super::Archive>(
1093 archive: &mut T,
1094 key: T::Key,
1095 value: T::Value,
1096 ) where
1097 T::Key: Clone,
1098 T::Value: Clone,
1099 {
1100 assert_send(archive.put(1, key.clone(), value.clone()));
1101 assert_send(archive.put_sync(2, key.clone(), value));
1102 assert_send(archive.get(Identifier::Index(1)));
1103 assert_send(archive.get(Identifier::Key(&key)));
1104 assert_send(archive.has(Identifier::Index(1)));
1105 assert_send(archive.has(Identifier::Key(&key)));
1106 assert_send(archive.sync());
1107 }
1108
1109 #[allow(dead_code)]
1110 fn assert_archive_destroy_is_send<T: super::Archive>(archive: T) {
1111 assert_send(archive.destroy());
1112 }
1113
1114 #[allow(dead_code)]
1115 fn assert_multi_archive_futures_are_send<T: super::MultiArchive>(
1116 archive: &mut T,
1117 key: T::Key,
1118 value: T::Value,
1119 ) where
1120 T::Key: Clone,
1121 T::Value: Clone,
1122 {
1123 assert_archive_futures_are_send(archive, key.clone(), value.clone());
1124 assert_send(archive.get_all(1));
1125 assert_send(archive.put_multi(1, key.clone(), value.clone()));
1126 assert_send(archive.put_multi_sync(2, key, value));
1127 }
1128
1129 #[allow(dead_code)]
1130 fn assert_prunable_archive_futures_are_send(
1131 archive: &mut prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1132 key: FixedBytes<64>,
1133 value: i32,
1134 ) {
1135 assert_archive_futures_are_send(archive, key, value);
1136 }
1137
1138 #[allow(dead_code)]
1139 fn assert_prunable_multi_archive_futures_are_send(
1140 archive: &mut prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1141 key: FixedBytes<64>,
1142 value: i32,
1143 ) {
1144 assert_multi_archive_futures_are_send(archive, key, value);
1145 }
1146
1147 #[allow(dead_code)]
1148 fn assert_prunable_archive_destroy_is_send(
1149 archive: prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1150 ) {
1151 assert_archive_destroy_is_send(archive);
1152 }
1153
1154 #[allow(dead_code)]
1155 fn assert_immutable_archive_futures_are_send(
1156 archive: &mut immutable::Archive<Context, FixedBytes<64>, i32>,
1157 key: FixedBytes<64>,
1158 value: i32,
1159 ) {
1160 assert_archive_futures_are_send(archive, key, value);
1161 }
1162
1163 #[allow(dead_code)]
1164 fn assert_immutable_archive_destroy_is_send(
1165 archive: immutable::Archive<Context, FixedBytes<64>, i32>,
1166 ) {
1167 assert_archive_destroy_is_send(archive);
1168 }
1169}