1use commonware_codec::Codec;
10use commonware_utils::Array;
11use std::future::Future;
12use thiserror::Error;
13
14pub mod immutable;
15pub mod prunable;
16
17#[cfg(all(test, feature = "arbitrary"))]
18mod conformance;
19
20pub enum Identifier<'a, K: Array> {
22 Index(u64),
23 Key(&'a K),
24}
25
26#[derive(Debug, Error)]
28pub enum Error {
29 #[error("journal error: {0}")]
30 Journal(#[from] crate::journal::Error),
31 #[error("ordinal error: {0}")]
32 Ordinal(#[from] crate::ordinal::Error),
33 #[error("metadata error: {0}")]
34 Metadata(#[from] crate::metadata::Error),
35 #[error("freezer error: {0}")]
36 Freezer(#[from] crate::freezer::Error),
37 #[error("record corrupted")]
38 RecordCorrupted,
39 #[error("already pruned to: {0}")]
40 AlreadyPrunedTo(u64),
41 #[error("record too large")]
42 RecordTooLarge,
43}
44
45pub trait Archive: Send {
47 type Key: Array;
49
50 type Value: Codec + Send;
52
53 fn put(
60 &mut self,
61 index: u64,
62 key: Self::Key,
63 value: Self::Value,
64 ) -> impl Future<Output = Result<(), Error>> + Send;
65
66 fn put_sync(
68 &mut self,
69 index: u64,
70 key: Self::Key,
71 value: Self::Value,
72 ) -> impl Future<Output = Result<(), Error>> + Send {
73 async move {
74 self.put(index, key, value).await?;
75 self.sync().await
76 }
77 }
78
79 fn get<'a>(
85 &'a self,
86 identifier: Identifier<'a, Self::Key>,
87 ) -> impl Future<Output = Result<Option<Self::Value>, Error>> + Send + use<'a, Self>;
88
89 fn has<'a>(
91 &'a self,
92 identifier: Identifier<'a, Self::Key>,
93 ) -> impl Future<Output = Result<bool, Error>> + Send + use<'a, Self>;
94
95 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>);
100
101 fn missing_items(&self, index: u64, max: usize) -> Vec<u64>;
106
107 fn ranges(&self) -> impl Iterator<Item = (u64, u64)>;
109
110 fn ranges_from(&self, from: u64) -> impl Iterator<Item = (u64, u64)>;
112
113 fn first_index(&self) -> Option<u64>;
115
116 fn last_index(&self) -> Option<u64>;
118
119 fn sync(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
121
122 fn destroy(self) -> impl Future<Output = Result<(), Error>> + Send;
124}
125
126pub trait MultiArchive: Archive {
132 fn get_all(
136 &self,
137 index: u64,
138 ) -> impl Future<Output = Result<Option<Vec<Self::Value>>, Error>> + Send + use<'_, Self>;
139
140 fn put_multi(
146 &mut self,
147 index: u64,
148 key: Self::Key,
149 value: Self::Value,
150 ) -> impl Future<Output = Result<(), Error>> + Send;
151
152 fn put_multi_sync(
154 &mut self,
155 index: u64,
156 key: Self::Key,
157 value: Self::Value,
158 ) -> impl Future<Output = Result<(), Error>> + Send {
159 async move {
160 self.put_multi(index, key, value).await?;
161 self.sync().await
162 }
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169 use crate::translator::TwoCap;
170 use commonware_codec::DecodeExt;
171 use commonware_macros::{test_group, test_traced};
172 use commonware_runtime::{
173 buffer::paged::CacheRef,
174 deterministic::{self, Context},
175 telemetry::metrics::has_metric_value,
176 Metrics as _, Runner, Supervisor as _,
177 };
178 use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
179 use rand::Rng;
180 use std::{
181 collections::BTreeMap,
182 num::{NonZeroU16, NonZeroUsize},
183 };
184
185 fn test_key(key: &str) -> FixedBytes<64> {
186 let mut buf = [0u8; 64];
187 let key = key.as_bytes();
188 assert!(key.len() <= buf.len());
189 buf[..key.len()].copy_from_slice(key);
190 FixedBytes::decode(buf.as_ref()).unwrap()
191 }
192
193 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
194 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
195
196 async fn create_prunable(
197 context: Context,
198 compression: Option<u8>,
199 ) -> impl MultiArchive<Key = FixedBytes<64>, Value = i32> {
200 let cfg = prunable::Config {
201 translator: TwoCap,
202 key_partition: "test-key".into(),
203 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
204 value_partition: "test-value".into(),
205 compression,
206 codec_config: (),
207 items_per_section: NZU64!(1024),
208 key_write_buffer: NZUsize!(1024),
209 value_write_buffer: NZUsize!(1024),
210 replay_buffer: NZUsize!(1024),
211 };
212 prunable::Archive::init(context, cfg).await.unwrap()
213 }
214
215 async fn create_immutable(
216 context: Context,
217 compression: Option<u8>,
218 ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
219 let cfg = immutable::Config {
220 metadata_partition: "test-metadata".into(),
221 freezer_table_partition: "test-table".into(),
222 freezer_table_initial_size: 64,
223 freezer_table_resize_frequency: 2,
224 freezer_table_resize_chunk_size: 32,
225 freezer_key_partition: "test-key".into(),
226 freezer_key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
227 freezer_value_partition: "test-value".into(),
228 freezer_value_target_size: 1024 * 1024,
229 freezer_value_compression: compression,
230 ordinal_partition: "test-ordinal".into(),
231 items_per_section: NZU64!(1024),
232 freezer_key_write_buffer: NZUsize!(1024 * 1024),
233 freezer_value_write_buffer: NZUsize!(1024 * 1024),
234 ordinal_write_buffer: NZUsize!(1024 * 1024),
235 replay_buffer: NZUsize!(1024 * 1024),
236 codec_config: (),
237 };
238 immutable::Archive::init(context, cfg).await.unwrap()
239 }
240
241 async fn test_put_get_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
242 let index = 1u64;
243 let key = test_key("testkey");
244 let data = 1;
245
246 let has = archive
248 .has(Identifier::Index(index))
249 .await
250 .expect("Failed to check key");
251 assert!(!has);
252 let has = archive
253 .has(Identifier::Key(&key))
254 .await
255 .expect("Failed to check key");
256 assert!(!has);
257
258 archive
260 .put(index, key.clone(), data)
261 .await
262 .expect("Failed to put data");
263
264 let has = archive
266 .has(Identifier::Index(index))
267 .await
268 .expect("Failed to check key");
269 assert!(has);
270 let has = archive
271 .has(Identifier::Key(&key))
272 .await
273 .expect("Failed to check key");
274 assert!(has);
275
276 let retrieved = archive
278 .get(Identifier::Key(&key))
279 .await
280 .expect("Failed to get data");
281 assert_eq!(retrieved, Some(data));
282
283 let retrieved = archive
285 .get(Identifier::Index(index))
286 .await
287 .expect("Failed to get data");
288 assert_eq!(retrieved, Some(data));
289
290 archive.sync().await.expect("Failed to sync data");
292 }
293
294 #[test_traced]
295 fn test_put_get_prunable_no_compression() {
296 let executor = deterministic::Runner::default();
297 executor.start(|context| async move {
298 let archive = create_prunable(context, None).await;
299 test_put_get_impl(archive).await;
300 });
301 }
302
303 #[test_traced]
304 fn test_put_get_prunable_compression() {
305 let executor = deterministic::Runner::default();
306 executor.start(|context| async move {
307 let archive = create_prunable(context, Some(3)).await;
308 test_put_get_impl(archive).await;
309 });
310 }
311
312 #[test_traced]
313 fn test_put_get_immutable_no_compression() {
314 let executor = deterministic::Runner::default();
315 executor.start(|context| async move {
316 let archive = create_immutable(context, None).await;
317 test_put_get_impl(archive).await;
318 });
319 }
320
321 #[test_traced]
322 fn test_put_get_immutable_compression() {
323 let executor = deterministic::Runner::default();
324 executor.start(|context| async move {
325 let archive = create_immutable(context, Some(3)).await;
326 test_put_get_impl(archive).await;
327 });
328 }
329
330 async fn test_duplicate_key_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
331 let index = 1u64;
332 let key = test_key("duplicate");
333 let data1 = 1;
334 let data2 = 2;
335
336 archive
338 .put(index, key.clone(), data1)
339 .await
340 .expect("Failed to put data");
341
342 archive
344 .put(index, key.clone(), data2)
345 .await
346 .expect("Duplicate put should not fail");
347
348 let retrieved = archive
350 .get(Identifier::Index(index))
351 .await
352 .expect("Failed to get data")
353 .expect("Data not found");
354 assert_eq!(retrieved, data1);
355
356 let retrieved = archive
357 .get(Identifier::Key(&key))
358 .await
359 .expect("Failed to get data")
360 .expect("Data not found");
361 assert_eq!(retrieved, data1);
362 }
363
364 #[test_traced]
365 fn test_duplicate_key_prunable_no_compression() {
366 let executor = deterministic::Runner::default();
367 executor.start(|context| async move {
368 let archive = create_prunable(context, None).await;
369 test_duplicate_key_impl(archive).await;
370 });
371 }
372
373 #[test_traced]
374 fn test_duplicate_key_prunable_compression() {
375 let executor = deterministic::Runner::default();
376 executor.start(|context| async move {
377 let archive = create_prunable(context, Some(3)).await;
378 test_duplicate_key_impl(archive).await;
379 });
380 }
381
382 #[test_traced]
383 fn test_duplicate_key_immutable_no_compression() {
384 let executor = deterministic::Runner::default();
385 executor.start(|context| async move {
386 let archive = create_immutable(context, None).await;
387 test_duplicate_key_impl(archive).await;
388 });
389 }
390
391 async fn test_duplicate_key_cross_index_impl(
392 mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>,
393 ) {
394 let key = test_key("dupe-xindex");
397 archive.put(2, key.clone(), 20).await.expect("put(2)");
398 archive.put(5, key.clone(), 50).await.expect("put(5)");
399
400 assert_eq!(
402 archive.get(Identifier::Index(2)).await.unwrap(),
403 Some(20),
404 "Index(2) must resolve to the value stored at 2"
405 );
406 assert_eq!(
407 archive.get(Identifier::Index(5)).await.unwrap(),
408 Some(50),
409 "Index(5) must resolve to the value stored at 5"
410 );
411
412 let got = archive
415 .get(Identifier::Key(&key))
416 .await
417 .unwrap()
418 .expect("key lookup must find at least one entry");
419 assert!(got == 20 || got == 50, "unexpected value: {got}");
420 assert!(archive.has(Identifier::Key(&key)).await.unwrap());
421 }
422
423 #[test_traced]
424 fn test_duplicate_key_cross_index_prunable_no_compression() {
425 let executor = deterministic::Runner::default();
426 executor.start(|context| async move {
427 let archive = create_prunable(context, None).await;
428 test_duplicate_key_cross_index_impl(archive).await;
429 });
430 }
431
432 #[test_traced]
433 fn test_duplicate_key_cross_index_prunable_compression() {
434 let executor = deterministic::Runner::default();
435 executor.start(|context| async move {
436 let archive = create_prunable(context, Some(3)).await;
437 test_duplicate_key_cross_index_impl(archive).await;
438 });
439 }
440
441 #[test_traced]
442 fn test_duplicate_key_cross_index_immutable_no_compression() {
443 let executor = deterministic::Runner::default();
444 executor.start(|context| async move {
445 let archive = create_immutable(context, None).await;
446 test_duplicate_key_cross_index_impl(archive).await;
447 });
448 }
449
450 #[test_traced]
451 fn test_duplicate_key_cross_index_immutable_compression() {
452 let executor = deterministic::Runner::default();
453 executor.start(|context| async move {
454 let archive = create_immutable(context, Some(3)).await;
455 test_duplicate_key_cross_index_impl(archive).await;
456 });
457 }
458
459 #[test_traced]
460 fn test_duplicate_key_immutable_compression() {
461 let executor = deterministic::Runner::default();
462 executor.start(|context| async move {
463 let archive = create_immutable(context, Some(3)).await;
464 test_duplicate_key_impl(archive).await;
465 });
466 }
467
468 async fn test_get_nonexistent_impl(archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
469 let index = 1u64;
471 let retrieved: Option<i32> = archive
472 .get(Identifier::Index(index))
473 .await
474 .expect("Failed to get data");
475 assert!(retrieved.is_none());
476
477 let key = test_key("nonexistent");
479 let retrieved = archive
480 .get(Identifier::Key(&key))
481 .await
482 .expect("Failed to get data");
483 assert!(retrieved.is_none());
484 }
485
486 #[test_traced]
487 fn test_get_nonexistent_prunable_no_compression() {
488 let executor = deterministic::Runner::default();
489 executor.start(|context| async move {
490 let archive = create_prunable(context, None).await;
491 test_get_nonexistent_impl(archive).await;
492 });
493 }
494
495 #[test_traced]
496 fn test_get_nonexistent_prunable_compression() {
497 let executor = deterministic::Runner::default();
498 executor.start(|context| async move {
499 let archive = create_prunable(context, Some(3)).await;
500 test_get_nonexistent_impl(archive).await;
501 });
502 }
503
504 #[test_traced]
505 fn test_get_nonexistent_immutable_no_compression() {
506 let executor = deterministic::Runner::default();
507 executor.start(|context| async move {
508 let archive = create_immutable(context, None).await;
509 test_get_nonexistent_impl(archive).await;
510 });
511 }
512
513 #[test_traced]
514 fn test_get_nonexistent_immutable_compression() {
515 let executor = deterministic::Runner::default();
516 executor.start(|context| async move {
517 let archive = create_immutable(context, Some(3)).await;
518 test_get_nonexistent_impl(archive).await;
519 });
520 }
521
522 async fn test_persistence_impl<A, F, Fut>(context: Context, creator: F, compression: Option<u8>)
523 where
524 A: Archive<Key = FixedBytes<64>, Value = i32>,
525 F: Fn(Context, Option<u8>) -> Fut,
526 Fut: Future<Output = A>,
527 {
528 {
530 let mut archive = creator(context.child("first"), compression).await;
531
532 let keys = vec![
534 (1u64, test_key("key1"), 1),
535 (2u64, test_key("key2"), 2),
536 (3u64, test_key("key3"), 3),
537 ];
538
539 for (index, key, data) in &keys {
540 archive
541 .put(*index, key.clone(), *data)
542 .await
543 .expect("Failed to put data");
544 }
545
546 archive.sync().await.expect("Failed to sync archive");
548 }
549
550 {
552 let archive = creator(context.child("second"), compression).await;
553
554 let keys = vec![
556 (1u64, test_key("key1"), 1),
557 (2u64, test_key("key2"), 2),
558 (3u64, test_key("key3"), 3),
559 ];
560
561 for (index, key, expected_data) in &keys {
562 let retrieved = archive
563 .get(Identifier::Index(*index))
564 .await
565 .expect("Failed to get data")
566 .expect("Data not found");
567 assert_eq!(retrieved, *expected_data);
568
569 let retrieved = archive
570 .get(Identifier::Key(key))
571 .await
572 .expect("Failed to get data")
573 .expect("Data not found");
574 assert_eq!(retrieved, *expected_data);
575 }
576 }
577 }
578
579 #[test_traced]
580 fn test_persistence_prunable_no_compression() {
581 let executor = deterministic::Runner::default();
582 executor.start(|context| async move {
583 test_persistence_impl(context, create_prunable, None).await;
584 });
585 }
586
587 #[test_traced]
588 fn test_persistence_prunable_compression() {
589 let executor = deterministic::Runner::default();
590 executor.start(|context| async move {
591 test_persistence_impl(context, create_prunable, Some(3)).await;
592 });
593 }
594
595 #[test_traced]
596 fn test_persistence_immutable_no_compression() {
597 let executor = deterministic::Runner::default();
598 executor.start(|context| async move {
599 test_persistence_impl(context, create_immutable, None).await;
600 });
601 }
602
603 #[test_traced]
604 fn test_persistence_immutable_compression() {
605 let executor = deterministic::Runner::default();
606 executor.start(|context| async move {
607 test_persistence_impl(context, create_immutable, Some(3)).await;
608 });
609 }
610
611 async fn test_ranges_impl<A, F, Fut>(mut context: Context, creator: F, compression: Option<u8>)
612 where
613 A: Archive<Key = FixedBytes<64>, Value = i32>,
614 F: Fn(Context, Option<u8>) -> Fut,
615 Fut: Future<Output = A>,
616 {
617 let mut keys = BTreeMap::new();
618 {
619 let mut archive = creator(context.child("first"), compression).await;
620
621 let mut last_index = 0u64;
623 while keys.len() < 100 {
624 let gap: u64 = context.gen_range(1..=10);
625 let index = last_index + gap;
626 last_index = index;
627
628 let mut key_bytes = [0u8; 64];
629 context.fill(&mut key_bytes);
630 let key = FixedBytes::<64>::decode(key_bytes.as_ref()).unwrap();
631 let data: i32 = context.gen();
632
633 if keys.contains_key(&index) {
634 continue;
635 }
636 keys.insert(index, (key.clone(), data));
637
638 archive
639 .put(index, key, data)
640 .await
641 .expect("Failed to put data");
642 }
643
644 archive.sync().await.expect("Failed to sync archive");
645 }
646
647 {
648 let archive = creator(context.child("second"), compression).await;
649 let sorted_indices: Vec<u64> = keys.keys().cloned().collect();
650
651 let (current_end, start_next) = archive.next_gap(0);
653 assert!(current_end.is_none());
654 assert_eq!(start_next, Some(sorted_indices[0]));
655
656 let mut i = 0;
658 while i < sorted_indices.len() {
659 let current_index = sorted_indices[i];
660
661 let mut j = i;
663 while j + 1 < sorted_indices.len() && sorted_indices[j + 1] == sorted_indices[j] + 1
664 {
665 j += 1;
666 }
667 let block_end_index = sorted_indices[j];
668 let next_actual_index = if j + 1 < sorted_indices.len() {
669 Some(sorted_indices[j + 1])
670 } else {
671 None
672 };
673
674 let (current_end, start_next) = archive.next_gap(current_index);
675 assert_eq!(current_end, Some(block_end_index));
676 assert_eq!(start_next, next_actual_index);
677
678 if let Some(next_index) = next_actual_index {
680 if next_index > block_end_index + 1 {
681 let in_gap_index = block_end_index + 1;
682 let (current_end, start_next) = archive.next_gap(in_gap_index);
683 assert!(current_end.is_none());
684 assert_eq!(start_next, Some(next_index));
685 }
686 }
687 i = j + 1;
688 }
689
690 let last_index = *sorted_indices.last().unwrap();
692 let (current_end, start_next) = archive.next_gap(last_index);
693 assert!(current_end.is_some());
694 assert!(start_next.is_none());
695 }
696 }
697
698 #[test_traced]
699 fn test_ranges_prunable_no_compression() {
700 let executor = deterministic::Runner::default();
701 executor.start(|context| async move {
702 test_ranges_impl(context, create_prunable, None).await;
703 });
704 }
705
706 #[test_traced]
707 fn test_ranges_prunable_compression() {
708 let executor = deterministic::Runner::default();
709 executor.start(|context| async move {
710 test_ranges_impl(context, create_prunable, Some(3)).await;
711 });
712 }
713
714 #[test_traced]
715 fn test_ranges_immutable_no_compression() {
716 let executor = deterministic::Runner::default();
717 executor.start(|context| async move {
718 test_ranges_impl(context, create_immutable, None).await;
719 });
720 }
721
722 #[test_traced]
723 fn test_ranges_immutable_compression() {
724 let executor = deterministic::Runner::default();
725 executor.start(|context| async move {
726 test_ranges_impl(context, create_immutable, Some(3)).await;
727 });
728 }
729
730 async fn test_many_keys_impl<A, F, Fut>(
731 mut context: Context,
732 creator: F,
733 compression: Option<u8>,
734 num: usize,
735 ) where
736 A: Archive<Key = FixedBytes<64>, Value = i32>,
737 F: Fn(Context, Option<u8>) -> Fut,
738 Fut: Future<Output = A>,
739 {
740 let mut keys = BTreeMap::new();
742 {
743 let mut archive = creator(context.child("first"), compression).await;
744 while keys.len() < num {
745 let index = keys.len() as u64;
746 let mut key = [0u8; 64];
747 context.fill(&mut key);
748 let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
749 let data: i32 = context.gen();
750
751 archive
752 .put(index, key.clone(), data)
753 .await
754 .expect("Failed to put data");
755 keys.insert(key, (index, data));
756
757 if context.gen_bool(0.1) {
759 archive.sync().await.expect("Failed to sync archive");
760 }
761 }
762 archive.sync().await.expect("Failed to sync archive");
763
764 for (key, (index, data)) in &keys {
766 let retrieved = archive
767 .get(Identifier::Index(*index))
768 .await
769 .expect("Failed to get data")
770 .expect("Data not found");
771 assert_eq!(&retrieved, data);
772 let retrieved = archive
773 .get(Identifier::Key(key))
774 .await
775 .expect("Failed to get data")
776 .expect("Data not found");
777 assert_eq!(&retrieved, data);
778 }
779 }
780
781 {
783 let archive = creator(context.child("second"), compression).await;
784
785 for (key, (index, data)) in &keys {
787 let retrieved = archive
788 .get(Identifier::Index(*index))
789 .await
790 .expect("Failed to get data")
791 .expect("Data not found");
792 assert_eq!(&retrieved, data);
793 let retrieved = archive
794 .get(Identifier::Key(key))
795 .await
796 .expect("Failed to get data")
797 .expect("Data not found");
798 assert_eq!(&retrieved, data);
799 }
800 }
801 }
802
803 fn test_many_keys_determinism<F, Fut, A>(creator: F, compression: Option<u8>, num: usize)
804 where
805 A: Archive<Key = FixedBytes<64>, Value = i32>,
806 F: Fn(Context, Option<u8>) -> Fut + Copy + Send + 'static,
807 Fut: Future<Output = A> + Send,
808 {
809 let executor = deterministic::Runner::default();
810 let state1 = executor.start(|context| async move {
811 test_many_keys_impl(context.child("storage"), creator, compression, num).await;
812 context.auditor().state()
813 });
814 let executor = deterministic::Runner::default();
815 let state2 = executor.start(|context| async move {
816 test_many_keys_impl(context.child("storage"), creator, compression, num).await;
817 context.auditor().state()
818 });
819 assert_eq!(state1, state2);
820 }
821
822 #[test_traced]
823 fn test_many_keys_prunable_no_compression() {
824 test_many_keys_determinism(create_prunable, None, 1_000);
825 }
826
827 #[test_traced]
828 fn test_many_keys_prunable_compression() {
829 test_many_keys_determinism(create_prunable, Some(3), 1_000);
830 }
831
832 #[test_traced]
833 fn test_many_keys_immutable_no_compression() {
834 test_many_keys_determinism(create_immutable, None, 1_000);
835 }
836
837 #[test_traced]
838 fn test_many_keys_immutable_compression() {
839 test_many_keys_determinism(create_immutable, Some(3), 1_000);
840 }
841
842 #[test_group("slow")]
843 #[test_traced]
844 fn test_many_keys_prunable_large() {
845 test_many_keys_determinism(create_prunable, None, 50_000);
846 }
847
848 #[test_group("slow")]
849 #[test_traced]
850 fn test_many_keys_immutable_large() {
851 test_many_keys_determinism(create_immutable, None, 50_000);
852 }
853
854 async fn test_put_multi_and_get_impl(
855 context: Context,
856 mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
857 ) {
858 let index = 5u64;
860 let key_a = test_key("aaa");
861 let key_b = test_key("bbb");
862 let key_c = test_key("ccc");
863
864 archive
865 .put_multi(index, key_a.clone(), 10)
866 .await
867 .expect("put_multi a");
868 archive
869 .put_multi(index, key_b.clone(), 20)
870 .await
871 .expect("put_multi b");
872 archive
873 .put_multi(index, key_c.clone(), 30)
874 .await
875 .expect("put_multi c");
876
877 assert_eq!(
879 archive.get(Identifier::Key(&key_a)).await.unwrap(),
880 Some(10)
881 );
882 assert_eq!(
883 archive.get(Identifier::Key(&key_b)).await.unwrap(),
884 Some(20)
885 );
886 assert_eq!(
887 archive.get(Identifier::Key(&key_c)).await.unwrap(),
888 Some(30)
889 );
890
891 let missing = test_key("zzz");
893 assert_eq!(archive.get(Identifier::Key(&missing)).await.unwrap(), None);
894
895 let buffer = context.encode();
897 assert!(has_metric_value(&buffer, "items_tracked", 1));
898 }
899
900 #[test_traced]
901 fn test_put_multi_and_get_prunable() {
902 let executor = deterministic::Runner::default();
903 executor.start(|context| async move {
904 let archive = create_prunable(context.child("storage"), None).await;
905 test_put_multi_and_get_impl(context, archive).await;
906 });
907 }
908
909 async fn test_put_multi_duplicate_key_impl(
910 context: Context,
911 mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
912 ) {
913 let key = test_key("dup");
914 archive.put_multi(5, key.clone(), 10).await.unwrap();
915 archive.put_multi(7, key.clone(), 20).await.unwrap();
916
917 assert_eq!(archive.get(Identifier::Index(5)).await.unwrap(), Some(10));
919 assert_eq!(archive.get(Identifier::Index(7)).await.unwrap(), Some(20));
920 assert_eq!(archive.get_all(5).await.unwrap(), Some(vec![10]));
921 assert_eq!(archive.get_all(7).await.unwrap(), Some(vec![20]));
922
923 assert!(matches!(
925 archive.get(Identifier::Key(&key)).await.unwrap(),
926 Some(10 | 20)
927 ));
928
929 let buffer = context.encode();
930 assert!(has_metric_value(&buffer, "items_tracked", 2));
931 }
932
933 #[test_traced]
934 fn test_put_multi_duplicate_key_prunable() {
935 let executor = deterministic::Runner::default();
936 executor.start(|context| async move {
937 let archive = create_prunable(context.child("storage"), None).await;
938 test_put_multi_duplicate_key_impl(context, archive).await;
939 });
940 }
941
942 async fn test_get_all_impl(mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>) {
943 archive.put_multi(5, test_key("aaa"), 10).await.unwrap();
945 archive.put_multi(5, test_key("bbb"), 20).await.unwrap();
946 archive.put_multi(5, test_key("ccc"), 30).await.unwrap();
947
948 archive.put_multi(7, test_key("ddd"), 40).await.unwrap();
950
951 let all = archive.get_all(5).await.unwrap();
953 assert_eq!(all, Some(vec![10, 20, 30]));
954
955 let all = archive.get_all(7).await.unwrap();
957 assert_eq!(all, Some(vec![40]));
958
959 let all = archive.get_all(99).await.unwrap();
961 assert_eq!(all, None);
962
963 assert_eq!(archive.get(Identifier::Index(5)).await.unwrap(), Some(10));
965 }
966
967 #[test_traced]
968 fn test_get_all_prunable() {
969 let executor = deterministic::Runner::default();
970 executor.start(|context| async move {
971 let archive = create_prunable(context, None).await;
972 test_get_all_impl(archive).await;
973 });
974 }
975
976 async fn test_put_multi_preserves_archive_put_semantics_impl(
977 mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
978 ) {
979 archive
981 .put_multi(1, test_key("aaa"), 10)
982 .await
983 .expect("put_multi");
984 archive
985 .put_multi(1, test_key("bbb"), 20)
986 .await
987 .expect("put_multi");
988
989 archive
991 .put(1, test_key("ccc"), 30)
992 .await
993 .expect("Archive::put should no-op");
994
995 assert_eq!(
997 archive
998 .get(Identifier::Key(&test_key("aaa")))
999 .await
1000 .unwrap(),
1001 Some(10)
1002 );
1003 assert_eq!(
1004 archive
1005 .get(Identifier::Key(&test_key("bbb")))
1006 .await
1007 .unwrap(),
1008 Some(20)
1009 );
1010 assert_eq!(
1011 archive
1012 .get(Identifier::Key(&test_key("ccc")))
1013 .await
1014 .unwrap(),
1015 None
1016 );
1017
1018 let first = archive
1020 .get(Identifier::Index(1))
1021 .await
1022 .unwrap()
1023 .expect("should find first");
1024 assert_eq!(first, 10);
1025 }
1026
1027 #[test_traced]
1028 fn test_put_multi_preserves_archive_put_semantics_prunable() {
1029 let executor = deterministic::Runner::default();
1030 executor.start(|context| async move {
1031 let archive = create_prunable(context, None).await;
1032 test_put_multi_preserves_archive_put_semantics_impl(archive).await;
1033 });
1034 }
1035
1036 async fn test_put_multi_restart_impl<A, F, Fut>(
1037 context: Context,
1038 creator: F,
1039 compression: Option<u8>,
1040 ) where
1041 A: MultiArchive<Key = FixedBytes<64>, Value = i32>,
1042 F: Fn(Context, Option<u8>) -> Fut,
1043 Fut: Future<Output = A>,
1044 {
1045 {
1047 let mut archive = creator(
1048 context.child("init").with_attribute("index", 1),
1049 compression,
1050 )
1051 .await;
1052 archive.put_multi(5, test_key("aaa"), 10).await.unwrap();
1053 archive.put_multi(5, test_key("bbb"), 20).await.unwrap();
1054 archive.put_multi(7, test_key("ccc"), 30).await.unwrap();
1055 archive.sync().await.unwrap();
1056 }
1057
1058 let archive = creator(
1060 context.child("init").with_attribute("index", 2),
1061 compression,
1062 )
1063 .await;
1064
1065 assert_eq!(
1066 archive
1067 .get(Identifier::Key(&test_key("aaa")))
1068 .await
1069 .unwrap(),
1070 Some(10)
1071 );
1072 assert_eq!(
1073 archive
1074 .get(Identifier::Key(&test_key("bbb")))
1075 .await
1076 .unwrap(),
1077 Some(20)
1078 );
1079 assert_eq!(
1080 archive
1081 .get(Identifier::Key(&test_key("ccc")))
1082 .await
1083 .unwrap(),
1084 Some(30)
1085 );
1086
1087 let buffer = context.encode();
1089 assert!(has_metric_value(&buffer, "items_tracked", 2));
1090 }
1091
1092 #[test_traced]
1093 fn test_put_multi_restart_prunable() {
1094 let executor = deterministic::Runner::default();
1095 executor.start(|context| async move {
1096 test_put_multi_restart_impl(context, create_prunable, None).await;
1097 });
1098 }
1099
1100 async fn test_put_multi_mixed_indices_impl(
1101 context: Context,
1102 mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
1103 ) {
1104 archive.put(1, test_key("single"), 100).await.unwrap();
1106 archive
1107 .put_multi(2, test_key("multi-a"), 200)
1108 .await
1109 .unwrap();
1110 archive
1111 .put_multi(2, test_key("multi-b"), 201)
1112 .await
1113 .unwrap();
1114 archive
1115 .put_multi(3, test_key("multi-c"), 300)
1116 .await
1117 .unwrap();
1118
1119 assert_eq!(
1121 archive
1122 .get(Identifier::Key(&test_key("single")))
1123 .await
1124 .unwrap(),
1125 Some(100)
1126 );
1127 assert_eq!(
1128 archive
1129 .get(Identifier::Key(&test_key("multi-a")))
1130 .await
1131 .unwrap(),
1132 Some(200)
1133 );
1134 assert_eq!(
1135 archive
1136 .get(Identifier::Key(&test_key("multi-b")))
1137 .await
1138 .unwrap(),
1139 Some(201)
1140 );
1141 assert_eq!(
1142 archive
1143 .get(Identifier::Key(&test_key("multi-c")))
1144 .await
1145 .unwrap(),
1146 Some(300)
1147 );
1148
1149 assert_eq!(archive.get(Identifier::Index(2)).await.unwrap(), Some(200));
1151
1152 let (end, next) = archive.next_gap(1);
1154 assert_eq!(end, Some(3));
1155 assert!(next.is_none());
1156
1157 let buffer = context.encode();
1158 assert!(has_metric_value(&buffer, "items_tracked", 3));
1159 }
1160
1161 #[test_traced]
1162 fn test_put_multi_mixed_indices_prunable() {
1163 let executor = deterministic::Runner::default();
1164 executor.start(|context| async move {
1165 let archive = create_prunable(context.child("storage"), None).await;
1166 test_put_multi_mixed_indices_impl(context, archive).await;
1167 });
1168 }
1169
1170 fn assert_send<T: Send>(_: T) {}
1171
1172 #[allow(dead_code)]
1173 fn assert_archive_futures_are_send<T: super::Archive>(
1174 archive: &mut T,
1175 key: T::Key,
1176 value: T::Value,
1177 ) where
1178 T::Key: Clone,
1179 T::Value: Clone,
1180 {
1181 assert_send(archive.put(1, key.clone(), value.clone()));
1182 assert_send(archive.put_sync(2, key.clone(), value));
1183 assert_send(archive.get(Identifier::Index(1)));
1184 assert_send(archive.get(Identifier::Key(&key)));
1185 assert_send(archive.has(Identifier::Index(1)));
1186 assert_send(archive.has(Identifier::Key(&key)));
1187 assert_send(archive.sync());
1188 }
1189
1190 #[allow(dead_code)]
1191 fn assert_archive_destroy_is_send<T: super::Archive>(archive: T) {
1192 assert_send(archive.destroy());
1193 }
1194
1195 #[allow(dead_code)]
1196 fn assert_multi_archive_futures_are_send<T: super::MultiArchive>(
1197 archive: &mut T,
1198 key: T::Key,
1199 value: T::Value,
1200 ) where
1201 T::Key: Clone,
1202 T::Value: Clone,
1203 {
1204 assert_archive_futures_are_send(archive, key.clone(), value.clone());
1205 assert_send(archive.get_all(1));
1206 assert_send(archive.put_multi(1, key.clone(), value.clone()));
1207 assert_send(archive.put_multi_sync(2, key, value));
1208 }
1209
1210 #[allow(dead_code)]
1211 fn assert_prunable_archive_futures_are_send(
1212 archive: &mut prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1213 key: FixedBytes<64>,
1214 value: i32,
1215 ) {
1216 assert_archive_futures_are_send(archive, key, value);
1217 }
1218
1219 #[allow(dead_code)]
1220 fn assert_prunable_multi_archive_futures_are_send(
1221 archive: &mut prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1222 key: FixedBytes<64>,
1223 value: i32,
1224 ) {
1225 assert_multi_archive_futures_are_send(archive, key, value);
1226 }
1227
1228 #[allow(dead_code)]
1229 fn assert_prunable_archive_destroy_is_send(
1230 archive: prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1231 ) {
1232 assert_archive_destroy_is_send(archive);
1233 }
1234
1235 #[allow(dead_code)]
1236 fn assert_immutable_archive_futures_are_send(
1237 archive: &mut immutable::Archive<Context, FixedBytes<64>, i32>,
1238 key: FixedBytes<64>,
1239 value: i32,
1240 ) {
1241 assert_archive_futures_are_send(archive, key, value);
1242 }
1243
1244 #[allow(dead_code)]
1245 fn assert_immutable_archive_destroy_is_send(
1246 archive: immutable::Archive<Context, FixedBytes<64>, i32>,
1247 ) {
1248 assert_archive_destroy_is_send(archive);
1249 }
1250}