1use commonware_codec::Codec;
7use commonware_utils::Array;
8use std::future::Future;
9use thiserror::Error;
10
11pub mod immutable;
12pub mod prunable;
13
14#[cfg(all(test, feature = "arbitrary"))]
15mod conformance;
16
17pub enum Identifier<'a, K: Array> {
19 Index(u64),
20 Key(&'a K),
21}
22
23#[derive(Debug, Error)]
25pub enum Error {
26 #[error("journal error: {0}")]
27 Journal(#[from] crate::journal::Error),
28 #[error("ordinal error: {0}")]
29 Ordinal(#[from] crate::ordinal::Error),
30 #[error("metadata error: {0}")]
31 Metadata(#[from] crate::metadata::Error),
32 #[error("freezer error: {0}")]
33 Freezer(#[from] crate::freezer::Error),
34 #[error("record corrupted")]
35 RecordCorrupted,
36 #[error("already pruned to: {0}")]
37 AlreadyPrunedTo(u64),
38 #[error("record too large")]
39 RecordTooLarge,
40}
41
42pub trait Archive: Send {
44 type Key: Array;
46
47 type Value: Codec + Send;
49
50 fn put(
55 &mut self,
56 index: u64,
57 key: Self::Key,
58 value: Self::Value,
59 ) -> impl Future<Output = Result<(), Error>> + Send;
60
61 fn put_sync(
63 &mut self,
64 index: u64,
65 key: Self::Key,
66 value: Self::Value,
67 ) -> impl Future<Output = Result<(), Error>> + Send {
68 async move {
69 self.put(index, key, value).await?;
70 self.sync().await
71 }
72 }
73
74 fn get<'a>(
80 &'a self,
81 identifier: Identifier<'a, Self::Key>,
82 ) -> impl Future<Output = Result<Option<Self::Value>, Error>> + Send + use<'a, Self>;
83
84 fn has<'a>(
86 &'a self,
87 identifier: Identifier<'a, Self::Key>,
88 ) -> impl Future<Output = Result<bool, Error>> + Send + use<'a, Self>;
89
90 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>);
95
96 fn missing_items(&self, index: u64, max: usize) -> Vec<u64>;
101
102 fn ranges(&self) -> impl Iterator<Item = (u64, u64)>;
104
105 fn first_index(&self) -> Option<u64>;
107
108 fn last_index(&self) -> Option<u64>;
110
111 fn sync(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
113
114 fn destroy(self) -> impl Future<Output = Result<(), Error>> + Send;
116}
117
118pub trait MultiArchive: Archive {
125 fn get_all(
129 &self,
130 index: u64,
131 ) -> impl Future<Output = Result<Option<Vec<Self::Value>>, Error>> + Send + use<'_, Self>;
132
133 fn put_multi(
139 &mut self,
140 index: u64,
141 key: Self::Key,
142 value: Self::Value,
143 ) -> impl Future<Output = Result<(), Error>> + Send;
144
145 fn put_multi_sync(
147 &mut self,
148 index: u64,
149 key: Self::Key,
150 value: Self::Value,
151 ) -> impl Future<Output = Result<(), Error>> + Send {
152 async move {
153 self.put_multi(index, key, value).await?;
154 self.sync().await
155 }
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162 use crate::{kv::tests::test_key, translator::TwoCap};
163 use commonware_codec::DecodeExt;
164 use commonware_macros::{test_group, test_traced};
165 use commonware_runtime::{
166 buffer::paged::CacheRef,
167 deterministic::{self, Context},
168 Metrics, Runner,
169 };
170 use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
171 use rand::Rng;
172 use std::{
173 collections::BTreeMap,
174 num::{NonZeroU16, NonZeroUsize},
175 };
176
177 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
178 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
179
180 async fn create_prunable(
181 context: Context,
182 compression: Option<u8>,
183 ) -> impl MultiArchive<Key = FixedBytes<64>, Value = i32> {
184 let cfg = prunable::Config {
185 translator: TwoCap,
186 key_partition: "test-key".into(),
187 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
188 value_partition: "test-value".into(),
189 compression,
190 codec_config: (),
191 items_per_section: NZU64!(1024),
192 key_write_buffer: NZUsize!(1024),
193 value_write_buffer: NZUsize!(1024),
194 replay_buffer: NZUsize!(1024),
195 };
196 prunable::Archive::init(context, cfg).await.unwrap()
197 }
198
199 async fn create_immutable(
200 context: Context,
201 compression: Option<u8>,
202 ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
203 let cfg = immutable::Config {
204 metadata_partition: "test-metadata".into(),
205 freezer_table_partition: "test-table".into(),
206 freezer_table_initial_size: 64,
207 freezer_table_resize_frequency: 2,
208 freezer_table_resize_chunk_size: 32,
209 freezer_key_partition: "test-key".into(),
210 freezer_key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
211 freezer_value_partition: "test-value".into(),
212 freezer_value_target_size: 1024 * 1024,
213 freezer_value_compression: compression,
214 ordinal_partition: "test-ordinal".into(),
215 items_per_section: NZU64!(1024),
216 freezer_key_write_buffer: NZUsize!(1024 * 1024),
217 freezer_value_write_buffer: NZUsize!(1024 * 1024),
218 ordinal_write_buffer: NZUsize!(1024 * 1024),
219 replay_buffer: NZUsize!(1024 * 1024),
220 codec_config: (),
221 };
222 immutable::Archive::init(context, cfg).await.unwrap()
223 }
224
225 async fn test_put_get_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
226 let index = 1u64;
227 let key = test_key("testkey");
228 let data = 1;
229
230 let has = archive
232 .has(Identifier::Index(index))
233 .await
234 .expect("Failed to check key");
235 assert!(!has);
236 let has = archive
237 .has(Identifier::Key(&key))
238 .await
239 .expect("Failed to check key");
240 assert!(!has);
241
242 archive
244 .put(index, key.clone(), data)
245 .await
246 .expect("Failed to put data");
247
248 let has = archive
250 .has(Identifier::Index(index))
251 .await
252 .expect("Failed to check key");
253 assert!(has);
254 let has = archive
255 .has(Identifier::Key(&key))
256 .await
257 .expect("Failed to check key");
258 assert!(has);
259
260 let retrieved = archive
262 .get(Identifier::Key(&key))
263 .await
264 .expect("Failed to get data");
265 assert_eq!(retrieved, Some(data));
266
267 let retrieved = archive
269 .get(Identifier::Index(index))
270 .await
271 .expect("Failed to get data");
272 assert_eq!(retrieved, Some(data));
273
274 archive.sync().await.expect("Failed to sync data");
276 }
277
278 #[test_traced]
279 fn test_put_get_prunable_no_compression() {
280 let executor = deterministic::Runner::default();
281 executor.start(|context| async move {
282 let archive = create_prunable(context, None).await;
283 test_put_get_impl(archive).await;
284 });
285 }
286
287 #[test_traced]
288 fn test_put_get_prunable_compression() {
289 let executor = deterministic::Runner::default();
290 executor.start(|context| async move {
291 let archive = create_prunable(context, Some(3)).await;
292 test_put_get_impl(archive).await;
293 });
294 }
295
296 #[test_traced]
297 fn test_put_get_immutable_no_compression() {
298 let executor = deterministic::Runner::default();
299 executor.start(|context| async move {
300 let archive = create_immutable(context, None).await;
301 test_put_get_impl(archive).await;
302 });
303 }
304
305 #[test_traced]
306 fn test_put_get_immutable_compression() {
307 let executor = deterministic::Runner::default();
308 executor.start(|context| async move {
309 let archive = create_immutable(context, Some(3)).await;
310 test_put_get_impl(archive).await;
311 });
312 }
313
314 async fn test_duplicate_key_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
315 let index = 1u64;
316 let key = test_key("duplicate");
317 let data1 = 1;
318 let data2 = 2;
319
320 archive
322 .put(index, key.clone(), data1)
323 .await
324 .expect("Failed to put data");
325
326 archive
328 .put(index, key.clone(), data2)
329 .await
330 .expect("Duplicate put should not fail");
331
332 let retrieved = archive
334 .get(Identifier::Index(index))
335 .await
336 .expect("Failed to get data")
337 .expect("Data not found");
338 assert_eq!(retrieved, data1);
339
340 let retrieved = archive
341 .get(Identifier::Key(&key))
342 .await
343 .expect("Failed to get data")
344 .expect("Data not found");
345 assert_eq!(retrieved, data1);
346 }
347
348 #[test_traced]
349 fn test_duplicate_key_prunable_no_compression() {
350 let executor = deterministic::Runner::default();
351 executor.start(|context| async move {
352 let archive = create_prunable(context, None).await;
353 test_duplicate_key_impl(archive).await;
354 });
355 }
356
357 #[test_traced]
358 fn test_duplicate_key_prunable_compression() {
359 let executor = deterministic::Runner::default();
360 executor.start(|context| async move {
361 let archive = create_prunable(context, Some(3)).await;
362 test_duplicate_key_impl(archive).await;
363 });
364 }
365
366 #[test_traced]
367 fn test_duplicate_key_immutable_no_compression() {
368 let executor = deterministic::Runner::default();
369 executor.start(|context| async move {
370 let archive = create_immutable(context, None).await;
371 test_duplicate_key_impl(archive).await;
372 });
373 }
374
375 #[test_traced]
376 fn test_duplicate_key_immutable_compression() {
377 let executor = deterministic::Runner::default();
378 executor.start(|context| async move {
379 let archive = create_immutable(context, Some(3)).await;
380 test_duplicate_key_impl(archive).await;
381 });
382 }
383
384 async fn test_get_nonexistent_impl(archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
385 let index = 1u64;
387 let retrieved: Option<i32> = archive
388 .get(Identifier::Index(index))
389 .await
390 .expect("Failed to get data");
391 assert!(retrieved.is_none());
392
393 let key = test_key("nonexistent");
395 let retrieved = archive
396 .get(Identifier::Key(&key))
397 .await
398 .expect("Failed to get data");
399 assert!(retrieved.is_none());
400 }
401
402 #[test_traced]
403 fn test_get_nonexistent_prunable_no_compression() {
404 let executor = deterministic::Runner::default();
405 executor.start(|context| async move {
406 let archive = create_prunable(context, None).await;
407 test_get_nonexistent_impl(archive).await;
408 });
409 }
410
411 #[test_traced]
412 fn test_get_nonexistent_prunable_compression() {
413 let executor = deterministic::Runner::default();
414 executor.start(|context| async move {
415 let archive = create_prunable(context, Some(3)).await;
416 test_get_nonexistent_impl(archive).await;
417 });
418 }
419
420 #[test_traced]
421 fn test_get_nonexistent_immutable_no_compression() {
422 let executor = deterministic::Runner::default();
423 executor.start(|context| async move {
424 let archive = create_immutable(context, None).await;
425 test_get_nonexistent_impl(archive).await;
426 });
427 }
428
429 #[test_traced]
430 fn test_get_nonexistent_immutable_compression() {
431 let executor = deterministic::Runner::default();
432 executor.start(|context| async move {
433 let archive = create_immutable(context, Some(3)).await;
434 test_get_nonexistent_impl(archive).await;
435 });
436 }
437
438 async fn test_persistence_impl<A, F, Fut>(context: Context, creator: F, compression: Option<u8>)
439 where
440 A: Archive<Key = FixedBytes<64>, Value = i32>,
441 F: Fn(Context, Option<u8>) -> Fut,
442 Fut: Future<Output = A>,
443 {
444 {
446 let mut archive = creator(context.with_label("first"), compression).await;
447
448 let keys = vec![
450 (1u64, test_key("key1"), 1),
451 (2u64, test_key("key2"), 2),
452 (3u64, test_key("key3"), 3),
453 ];
454
455 for (index, key, data) in &keys {
456 archive
457 .put(*index, key.clone(), *data)
458 .await
459 .expect("Failed to put data");
460 }
461
462 archive.sync().await.expect("Failed to sync archive");
464 }
465
466 {
468 let archive = creator(context.with_label("second"), compression).await;
469
470 let keys = vec![
472 (1u64, test_key("key1"), 1),
473 (2u64, test_key("key2"), 2),
474 (3u64, test_key("key3"), 3),
475 ];
476
477 for (index, key, expected_data) in &keys {
478 let retrieved = archive
479 .get(Identifier::Index(*index))
480 .await
481 .expect("Failed to get data")
482 .expect("Data not found");
483 assert_eq!(retrieved, *expected_data);
484
485 let retrieved = archive
486 .get(Identifier::Key(key))
487 .await
488 .expect("Failed to get data")
489 .expect("Data not found");
490 assert_eq!(retrieved, *expected_data);
491 }
492 }
493 }
494
495 #[test_traced]
496 fn test_persistence_prunable_no_compression() {
497 let executor = deterministic::Runner::default();
498 executor.start(|context| async move {
499 test_persistence_impl(context, create_prunable, None).await;
500 });
501 }
502
503 #[test_traced]
504 fn test_persistence_prunable_compression() {
505 let executor = deterministic::Runner::default();
506 executor.start(|context| async move {
507 test_persistence_impl(context, create_prunable, Some(3)).await;
508 });
509 }
510
511 #[test_traced]
512 fn test_persistence_immutable_no_compression() {
513 let executor = deterministic::Runner::default();
514 executor.start(|context| async move {
515 test_persistence_impl(context, create_immutable, None).await;
516 });
517 }
518
519 #[test_traced]
520 fn test_persistence_immutable_compression() {
521 let executor = deterministic::Runner::default();
522 executor.start(|context| async move {
523 test_persistence_impl(context, create_immutable, Some(3)).await;
524 });
525 }
526
527 async fn test_ranges_impl<A, F, Fut>(mut context: Context, creator: F, compression: Option<u8>)
528 where
529 A: Archive<Key = FixedBytes<64>, Value = i32>,
530 F: Fn(Context, Option<u8>) -> Fut,
531 Fut: Future<Output = A>,
532 {
533 let mut keys = BTreeMap::new();
534 {
535 let mut archive = creator(context.with_label("first"), compression).await;
536
537 let mut last_index = 0u64;
539 while keys.len() < 100 {
540 let gap: u64 = context.gen_range(1..=10);
541 let index = last_index + gap;
542 last_index = index;
543
544 let mut key_bytes = [0u8; 64];
545 context.fill(&mut key_bytes);
546 let key = FixedBytes::<64>::decode(key_bytes.as_ref()).unwrap();
547 let data: i32 = context.gen();
548
549 if keys.contains_key(&index) {
550 continue;
551 }
552 keys.insert(index, (key.clone(), data));
553
554 archive
555 .put(index, key, data)
556 .await
557 .expect("Failed to put data");
558 }
559
560 archive.sync().await.expect("Failed to sync archive");
561 }
562
563 {
564 let archive = creator(context.with_label("second"), compression).await;
565 let sorted_indices: Vec<u64> = keys.keys().cloned().collect();
566
567 let (current_end, start_next) = archive.next_gap(0);
569 assert!(current_end.is_none());
570 assert_eq!(start_next, Some(sorted_indices[0]));
571
572 let mut i = 0;
574 while i < sorted_indices.len() {
575 let current_index = sorted_indices[i];
576
577 let mut j = i;
579 while j + 1 < sorted_indices.len() && sorted_indices[j + 1] == sorted_indices[j] + 1
580 {
581 j += 1;
582 }
583 let block_end_index = sorted_indices[j];
584 let next_actual_index = if j + 1 < sorted_indices.len() {
585 Some(sorted_indices[j + 1])
586 } else {
587 None
588 };
589
590 let (current_end, start_next) = archive.next_gap(current_index);
591 assert_eq!(current_end, Some(block_end_index));
592 assert_eq!(start_next, next_actual_index);
593
594 if let Some(next_index) = next_actual_index {
596 if next_index > block_end_index + 1 {
597 let in_gap_index = block_end_index + 1;
598 let (current_end, start_next) = archive.next_gap(in_gap_index);
599 assert!(current_end.is_none());
600 assert_eq!(start_next, Some(next_index));
601 }
602 }
603 i = j + 1;
604 }
605
606 let last_index = *sorted_indices.last().unwrap();
608 let (current_end, start_next) = archive.next_gap(last_index);
609 assert!(current_end.is_some());
610 assert!(start_next.is_none());
611 }
612 }
613
614 #[test_traced]
615 fn test_ranges_prunable_no_compression() {
616 let executor = deterministic::Runner::default();
617 executor.start(|context| async move {
618 test_ranges_impl(context, create_prunable, None).await;
619 });
620 }
621
622 #[test_traced]
623 fn test_ranges_prunable_compression() {
624 let executor = deterministic::Runner::default();
625 executor.start(|context| async move {
626 test_ranges_impl(context, create_prunable, Some(3)).await;
627 });
628 }
629
630 #[test_traced]
631 fn test_ranges_immutable_no_compression() {
632 let executor = deterministic::Runner::default();
633 executor.start(|context| async move {
634 test_ranges_impl(context, create_immutable, None).await;
635 });
636 }
637
638 #[test_traced]
639 fn test_ranges_immutable_compression() {
640 let executor = deterministic::Runner::default();
641 executor.start(|context| async move {
642 test_ranges_impl(context, create_immutable, Some(3)).await;
643 });
644 }
645
646 async fn test_many_keys_impl<A, F, Fut>(
647 mut context: Context,
648 creator: F,
649 compression: Option<u8>,
650 num: usize,
651 ) where
652 A: Archive<Key = FixedBytes<64>, Value = i32>,
653 F: Fn(Context, Option<u8>) -> Fut,
654 Fut: Future<Output = A>,
655 {
656 let mut keys = BTreeMap::new();
658 {
659 let mut archive = creator(context.with_label("first"), compression).await;
660 while keys.len() < num {
661 let index = keys.len() as u64;
662 let mut key = [0u8; 64];
663 context.fill(&mut key);
664 let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
665 let data: i32 = context.gen();
666
667 archive
668 .put(index, key.clone(), data)
669 .await
670 .expect("Failed to put data");
671 keys.insert(key, (index, data));
672
673 if context.gen_bool(0.1) {
675 archive.sync().await.expect("Failed to sync archive");
676 }
677 }
678 archive.sync().await.expect("Failed to sync archive");
679
680 for (key, (index, data)) in &keys {
682 let retrieved = archive
683 .get(Identifier::Index(*index))
684 .await
685 .expect("Failed to get data")
686 .expect("Data not found");
687 assert_eq!(&retrieved, data);
688 let retrieved = archive
689 .get(Identifier::Key(key))
690 .await
691 .expect("Failed to get data")
692 .expect("Data not found");
693 assert_eq!(&retrieved, data);
694 }
695 }
696
697 {
699 let archive = creator(context.with_label("second"), compression).await;
700
701 for (key, (index, data)) in &keys {
703 let retrieved = archive
704 .get(Identifier::Index(*index))
705 .await
706 .expect("Failed to get data")
707 .expect("Data not found");
708 assert_eq!(&retrieved, data);
709 let retrieved = archive
710 .get(Identifier::Key(key))
711 .await
712 .expect("Failed to get data")
713 .expect("Data not found");
714 assert_eq!(&retrieved, data);
715 }
716 }
717 }
718
719 fn test_many_keys_determinism<F, Fut, A>(creator: F, compression: Option<u8>, num: usize)
720 where
721 A: Archive<Key = FixedBytes<64>, Value = i32>,
722 F: Fn(Context, Option<u8>) -> Fut + Copy + Send + 'static,
723 Fut: Future<Output = A> + Send,
724 {
725 let executor = deterministic::Runner::default();
726 let state1 = executor.start(|context| async move {
727 test_many_keys_impl(context.clone(), creator, compression, num).await;
728 context.auditor().state()
729 });
730 let executor = deterministic::Runner::default();
731 let state2 = executor.start(|context| async move {
732 test_many_keys_impl(context.clone(), creator, compression, num).await;
733 context.auditor().state()
734 });
735 assert_eq!(state1, state2);
736 }
737
738 #[test_traced]
739 fn test_many_keys_prunable_no_compression() {
740 test_many_keys_determinism(create_prunable, None, 1_000);
741 }
742
743 #[test_traced]
744 fn test_many_keys_prunable_compression() {
745 test_many_keys_determinism(create_prunable, Some(3), 1_000);
746 }
747
748 #[test_traced]
749 fn test_many_keys_immutable_no_compression() {
750 test_many_keys_determinism(create_immutable, None, 1_000);
751 }
752
753 #[test_traced]
754 fn test_many_keys_immutable_compression() {
755 test_many_keys_determinism(create_immutable, Some(3), 1_000);
756 }
757
758 #[test_group("slow")]
759 #[test_traced]
760 fn test_many_keys_prunable_large() {
761 test_many_keys_determinism(create_prunable, None, 50_000);
762 }
763
764 #[test_group("slow")]
765 #[test_traced]
766 fn test_many_keys_immutable_large() {
767 test_many_keys_determinism(create_immutable, None, 50_000);
768 }
769
770 async fn test_put_multi_and_get_impl(
771 context: Context,
772 mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
773 ) {
774 let index = 5u64;
776 let key_a = test_key("aaa");
777 let key_b = test_key("bbb");
778 let key_c = test_key("ccc");
779
780 archive
781 .put_multi(index, key_a.clone(), 10)
782 .await
783 .expect("put_multi a");
784 archive
785 .put_multi(index, key_b.clone(), 20)
786 .await
787 .expect("put_multi b");
788 archive
789 .put_multi(index, key_c.clone(), 30)
790 .await
791 .expect("put_multi c");
792
793 assert_eq!(
795 archive.get(Identifier::Key(&key_a)).await.unwrap(),
796 Some(10)
797 );
798 assert_eq!(
799 archive.get(Identifier::Key(&key_b)).await.unwrap(),
800 Some(20)
801 );
802 assert_eq!(
803 archive.get(Identifier::Key(&key_c)).await.unwrap(),
804 Some(30)
805 );
806
807 let missing = test_key("zzz");
809 assert_eq!(archive.get(Identifier::Key(&missing)).await.unwrap(), None);
810
811 let buffer = context.encode();
813 assert!(buffer.contains("items_tracked 1"));
814 }
815
816 #[test_traced]
817 fn test_put_multi_and_get_prunable() {
818 let executor = deterministic::Runner::default();
819 executor.start(|context| async move {
820 let archive = create_prunable(context.clone(), None).await;
821 test_put_multi_and_get_impl(context, archive).await;
822 });
823 }
824
825 async fn test_put_multi_duplicate_key_impl(
826 context: Context,
827 mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
828 ) {
829 let key = test_key("dup");
830 archive.put_multi(5, key.clone(), 10).await.unwrap();
831 archive.put_multi(7, key.clone(), 20).await.unwrap();
832
833 assert_eq!(archive.get(Identifier::Index(5)).await.unwrap(), Some(10));
835 assert_eq!(archive.get(Identifier::Index(7)).await.unwrap(), Some(20));
836 assert_eq!(archive.get_all(5).await.unwrap(), Some(vec![10]));
837 assert_eq!(archive.get_all(7).await.unwrap(), Some(vec![20]));
838
839 assert!(matches!(
841 archive.get(Identifier::Key(&key)).await.unwrap(),
842 Some(10 | 20)
843 ));
844
845 let buffer = context.encode();
846 assert!(buffer.contains("items_tracked 2"));
847 }
848
849 #[test_traced]
850 fn test_put_multi_duplicate_key_prunable() {
851 let executor = deterministic::Runner::default();
852 executor.start(|context| async move {
853 let archive = create_prunable(context.clone(), None).await;
854 test_put_multi_duplicate_key_impl(context, archive).await;
855 });
856 }
857
858 async fn test_get_all_impl(mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>) {
859 archive.put_multi(5, test_key("aaa"), 10).await.unwrap();
861 archive.put_multi(5, test_key("bbb"), 20).await.unwrap();
862 archive.put_multi(5, test_key("ccc"), 30).await.unwrap();
863
864 archive.put_multi(7, test_key("ddd"), 40).await.unwrap();
866
867 let all = archive.get_all(5).await.unwrap();
869 assert_eq!(all, Some(vec![10, 20, 30]));
870
871 let all = archive.get_all(7).await.unwrap();
873 assert_eq!(all, Some(vec![40]));
874
875 let all = archive.get_all(99).await.unwrap();
877 assert_eq!(all, None);
878
879 assert_eq!(archive.get(Identifier::Index(5)).await.unwrap(), Some(10));
881 }
882
883 #[test_traced]
884 fn test_get_all_prunable() {
885 let executor = deterministic::Runner::default();
886 executor.start(|context| async move {
887 let archive = create_prunable(context, None).await;
888 test_get_all_impl(archive).await;
889 });
890 }
891
892 async fn test_put_multi_preserves_archive_put_semantics_impl(
893 mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
894 ) {
895 archive
897 .put_multi(1, test_key("aaa"), 10)
898 .await
899 .expect("put_multi");
900 archive
901 .put_multi(1, test_key("bbb"), 20)
902 .await
903 .expect("put_multi");
904
905 archive
907 .put(1, test_key("ccc"), 30)
908 .await
909 .expect("Archive::put should no-op");
910
911 assert_eq!(
913 archive
914 .get(Identifier::Key(&test_key("aaa")))
915 .await
916 .unwrap(),
917 Some(10)
918 );
919 assert_eq!(
920 archive
921 .get(Identifier::Key(&test_key("bbb")))
922 .await
923 .unwrap(),
924 Some(20)
925 );
926 assert_eq!(
927 archive
928 .get(Identifier::Key(&test_key("ccc")))
929 .await
930 .unwrap(),
931 None
932 );
933
934 let first = archive
936 .get(Identifier::Index(1))
937 .await
938 .unwrap()
939 .expect("should find first");
940 assert_eq!(first, 10);
941 }
942
943 #[test_traced]
944 fn test_put_multi_preserves_archive_put_semantics_prunable() {
945 let executor = deterministic::Runner::default();
946 executor.start(|context| async move {
947 let archive = create_prunable(context, None).await;
948 test_put_multi_preserves_archive_put_semantics_impl(archive).await;
949 });
950 }
951
952 async fn test_put_multi_restart_impl<A, F, Fut>(
953 context: Context,
954 creator: F,
955 compression: Option<u8>,
956 ) where
957 A: MultiArchive<Key = FixedBytes<64>, Value = i32>,
958 F: Fn(Context, Option<u8>) -> Fut,
959 Fut: Future<Output = A>,
960 {
961 {
963 let mut archive = creator(context.with_label("init1"), compression).await;
964 archive.put_multi(5, test_key("aaa"), 10).await.unwrap();
965 archive.put_multi(5, test_key("bbb"), 20).await.unwrap();
966 archive.put_multi(7, test_key("ccc"), 30).await.unwrap();
967 archive.sync().await.unwrap();
968 }
969
970 let archive = creator(context.with_label("init2"), compression).await;
972
973 assert_eq!(
974 archive
975 .get(Identifier::Key(&test_key("aaa")))
976 .await
977 .unwrap(),
978 Some(10)
979 );
980 assert_eq!(
981 archive
982 .get(Identifier::Key(&test_key("bbb")))
983 .await
984 .unwrap(),
985 Some(20)
986 );
987 assert_eq!(
988 archive
989 .get(Identifier::Key(&test_key("ccc")))
990 .await
991 .unwrap(),
992 Some(30)
993 );
994
995 let buffer = context.encode();
997 assert!(buffer.contains("items_tracked 2"));
998 }
999
1000 #[test_traced]
1001 fn test_put_multi_restart_prunable() {
1002 let executor = deterministic::Runner::default();
1003 executor.start(|context| async move {
1004 test_put_multi_restart_impl(context, create_prunable, None).await;
1005 });
1006 }
1007
1008 async fn test_put_multi_mixed_indices_impl(
1009 context: Context,
1010 mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
1011 ) {
1012 archive.put(1, test_key("single"), 100).await.unwrap();
1014 archive
1015 .put_multi(2, test_key("multi-a"), 200)
1016 .await
1017 .unwrap();
1018 archive
1019 .put_multi(2, test_key("multi-b"), 201)
1020 .await
1021 .unwrap();
1022 archive
1023 .put_multi(3, test_key("multi-c"), 300)
1024 .await
1025 .unwrap();
1026
1027 assert_eq!(
1029 archive
1030 .get(Identifier::Key(&test_key("single")))
1031 .await
1032 .unwrap(),
1033 Some(100)
1034 );
1035 assert_eq!(
1036 archive
1037 .get(Identifier::Key(&test_key("multi-a")))
1038 .await
1039 .unwrap(),
1040 Some(200)
1041 );
1042 assert_eq!(
1043 archive
1044 .get(Identifier::Key(&test_key("multi-b")))
1045 .await
1046 .unwrap(),
1047 Some(201)
1048 );
1049 assert_eq!(
1050 archive
1051 .get(Identifier::Key(&test_key("multi-c")))
1052 .await
1053 .unwrap(),
1054 Some(300)
1055 );
1056
1057 assert_eq!(archive.get(Identifier::Index(2)).await.unwrap(), Some(200));
1059
1060 let (end, next) = archive.next_gap(1);
1062 assert_eq!(end, Some(3));
1063 assert!(next.is_none());
1064
1065 let buffer = context.encode();
1066 assert!(buffer.contains("items_tracked 3"));
1067 }
1068
1069 #[test_traced]
1070 fn test_put_multi_mixed_indices_prunable() {
1071 let executor = deterministic::Runner::default();
1072 executor.start(|context| async move {
1073 let archive = create_prunable(context.clone(), None).await;
1074 test_put_multi_mixed_indices_impl(context, archive).await;
1075 });
1076 }
1077
1078 fn assert_send<T: Send>(_: T) {}
1079
1080 #[allow(dead_code)]
1081 fn assert_archive_futures_are_send<T: super::Archive>(
1082 archive: &mut T,
1083 key: T::Key,
1084 value: T::Value,
1085 ) where
1086 T::Key: Clone,
1087 T::Value: Clone,
1088 {
1089 assert_send(archive.put(1, key.clone(), value.clone()));
1090 assert_send(archive.put_sync(2, key.clone(), value));
1091 assert_send(archive.get(Identifier::Index(1)));
1092 assert_send(archive.get(Identifier::Key(&key)));
1093 assert_send(archive.has(Identifier::Index(1)));
1094 assert_send(archive.has(Identifier::Key(&key)));
1095 assert_send(archive.sync());
1096 }
1097
1098 #[allow(dead_code)]
1099 fn assert_archive_destroy_is_send<T: super::Archive>(archive: T) {
1100 assert_send(archive.destroy());
1101 }
1102
1103 #[allow(dead_code)]
1104 fn assert_multi_archive_futures_are_send<T: super::MultiArchive>(
1105 archive: &mut T,
1106 key: T::Key,
1107 value: T::Value,
1108 ) where
1109 T::Key: Clone,
1110 T::Value: Clone,
1111 {
1112 assert_archive_futures_are_send(archive, key.clone(), value.clone());
1113 assert_send(archive.get_all(1));
1114 assert_send(archive.put_multi(1, key.clone(), value.clone()));
1115 assert_send(archive.put_multi_sync(2, key, value));
1116 }
1117
1118 #[allow(dead_code)]
1119 fn assert_prunable_archive_futures_are_send(
1120 archive: &mut prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1121 key: FixedBytes<64>,
1122 value: i32,
1123 ) {
1124 assert_archive_futures_are_send(archive, key, value);
1125 }
1126
1127 #[allow(dead_code)]
1128 fn assert_prunable_multi_archive_futures_are_send(
1129 archive: &mut prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1130 key: FixedBytes<64>,
1131 value: i32,
1132 ) {
1133 assert_multi_archive_futures_are_send(archive, key, value);
1134 }
1135
1136 #[allow(dead_code)]
1137 fn assert_prunable_archive_destroy_is_send(
1138 archive: prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1139 ) {
1140 assert_archive_destroy_is_send(archive);
1141 }
1142
1143 #[allow(dead_code)]
1144 fn assert_immutable_archive_futures_are_send(
1145 archive: &mut immutable::Archive<Context, FixedBytes<64>, i32>,
1146 key: FixedBytes<64>,
1147 value: i32,
1148 ) {
1149 assert_archive_futures_are_send(archive, key, value);
1150 }
1151
1152 #[allow(dead_code)]
1153 fn assert_immutable_archive_destroy_is_send(
1154 archive: immutable::Archive<Context, FixedBytes<64>, i32>,
1155 ) {
1156 assert_archive_destroy_is_send(archive);
1157 }
1158}