Skip to main content

commonware_storage/archive/
mod.rs

1//! A write-once key-value store for ordered data.
2//!
3//! [Archive] is a key-value store designed for workloads where data is written only once and each
4//! item is addressed by both an `index` and a `key`. Workloads with unique indices should use [Archive]
5//! and workloads with overlapping indices should use [MultiArchive] (allows all items with the same index
6//! to be retrieved). The same key may be stored at multiple indices in either case, and a key lookup may
7//! return any of the associated values.
8
9use commonware_codec::Codec;
10use commonware_utils::Array;
11use std::future::Future;
12use thiserror::Error;
13
14pub mod immutable;
15pub mod prunable;
16
17#[cfg(all(test, feature = "arbitrary"))]
18mod conformance;
19
20/// Subject of a `get` or `has` operation.
21pub enum Identifier<'a, K: Array> {
22    Index(u64),
23    Key(&'a K),
24}
25
26/// Errors that can occur when interacting with the archive.
27#[derive(Debug, Error)]
28pub enum Error {
29    #[error("journal error: {0}")]
30    Journal(#[from] crate::journal::Error),
31    #[error("ordinal error: {0}")]
32    Ordinal(#[from] crate::ordinal::Error),
33    #[error("metadata error: {0}")]
34    Metadata(#[from] crate::metadata::Error),
35    #[error("freezer error: {0}")]
36    Freezer(#[from] crate::freezer::Error),
37    #[error("record corrupted")]
38    RecordCorrupted,
39    #[error("already pruned to: {0}")]
40    AlreadyPrunedTo(u64),
41    #[error("record too large")]
42    RecordTooLarge,
43}
44
45/// A write-once key-value store addressed by both an index and a key.
46pub trait Archive: Send {
47    /// The type of the key.
48    type Key: Array;
49
50    /// The type of the value.
51    type Value: Codec + Send;
52
53    /// Store an item in [Archive].
54    ///
55    /// Indices are unique: if the index already exists, put does nothing and returns. Duplicate
56    /// indices can be stored via [MultiArchive::put_multi]. Keys need not be unique: the same key
57    /// may be stored at multiple indices, and a subsequent [Archive::get] or [Archive::has] call
58    /// with an [Identifier::Key] identifier may return any of the values associated with that key.
59    fn put(
60        &mut self,
61        index: u64,
62        key: Self::Key,
63        value: Self::Value,
64    ) -> impl Future<Output = Result<(), Error>> + Send;
65
66    /// Perform a [Archive::put] and [Archive::sync] in a single operation.
67    fn put_sync(
68        &mut self,
69        index: u64,
70        key: Self::Key,
71        value: Self::Value,
72    ) -> impl Future<Output = Result<(), Error>> + Send {
73        async move {
74            self.put(index, key, value).await?;
75            self.sync().await
76        }
77    }
78
79    /// Retrieve an item from [Archive].
80    ///
81    /// Note that if the [Archive] is a [MultiArchive], there may be multiple values associated with the
82    /// same [Identifier::Index]. If there are multiple values, the first stored will be returned. Use
83    /// [MultiArchive::get_all] to retrieve all values at an index.
84    fn get<'a>(
85        &'a self,
86        identifier: Identifier<'a, Self::Key>,
87    ) -> impl Future<Output = Result<Option<Self::Value>, Error>> + Send + use<'a, Self>;
88
89    /// Check if an item exists in [Archive].
90    fn has<'a>(
91        &'a self,
92        identifier: Identifier<'a, Self::Key>,
93    ) -> impl Future<Output = Result<bool, Error>> + Send + use<'a, Self>;
94
95    /// Retrieve the end of the current range including `index` (inclusive) and
96    /// the start of the next range after `index` (if it exists).
97    ///
98    /// This is useful for driving backfill operations over the archive.
99    fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>);
100
101    /// Returns up to `max` missing items starting from `start`.
102    ///
103    /// This method iterates through gaps between existing ranges, collecting missing indices
104    /// until either `max` items are found or there are no more gaps to fill.
105    fn missing_items(&self, index: u64, max: usize) -> Vec<u64>;
106
107    /// Retrieve an iterator over all populated ranges (inclusive) within the [Archive].
108    fn ranges(&self) -> impl Iterator<Item = (u64, u64)>;
109
110    /// Retrieve an iterator over ranges that overlap or follow `from`.
111    fn ranges_from(&self, from: u64) -> impl Iterator<Item = (u64, u64)>;
112
113    /// Retrieve the first index in the [Archive].
114    fn first_index(&self) -> Option<u64>;
115
116    /// Retrieve the last index in the [Archive].
117    fn last_index(&self) -> Option<u64>;
118
119    /// Sync all pending writes.
120    fn sync(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
121
122    /// Remove all persistent data created by this [Archive].
123    fn destroy(self) -> impl Future<Output = Result<(), Error>> + Send;
124}
125
126/// Extension of [Archive] that supports multiple items at the same index.
127///
128/// Unlike [Archive::put], which is a no-op when the index already exists,
129/// [MultiArchive::put_multi] allows storing additional `(key, value)` pairs
130/// at an existing index.
131pub trait MultiArchive: Archive {
132    /// Retrieve all values stored at the given index.
133    ///
134    /// Returns `None` if the index does not exist or has been pruned.
135    fn get_all(
136        &self,
137        index: u64,
138    ) -> impl Future<Output = Result<Option<Vec<Self::Value>>, Error>> + Send + use<'_, Self>;
139
140    /// Store an item, allowing multiple items at the same index.
141    ///
142    /// Multiple items may share the same `index`. If the same key is stored at
143    /// multiple indices, any associated value may be returned when queried with
144    /// [Identifier::Key].
145    fn put_multi(
146        &mut self,
147        index: u64,
148        key: Self::Key,
149        value: Self::Value,
150    ) -> impl Future<Output = Result<(), Error>> + Send;
151
152    /// Perform a [MultiArchive::put_multi] and [Archive::sync] in a single operation.
153    fn put_multi_sync(
154        &mut self,
155        index: u64,
156        key: Self::Key,
157        value: Self::Value,
158    ) -> impl Future<Output = Result<(), Error>> + Send {
159        async move {
160            self.put_multi(index, key, value).await?;
161            self.sync().await
162        }
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169    use crate::translator::TwoCap;
170    use commonware_codec::DecodeExt;
171    use commonware_macros::{test_group, test_traced};
172    use commonware_runtime::{
173        buffer::paged::CacheRef,
174        deterministic::{self, Context},
175        telemetry::metrics::has_metric_value,
176        Metrics as _, Runner, Supervisor as _,
177    };
178    use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
179    use rand::Rng;
180    use std::{
181        collections::BTreeMap,
182        num::{NonZeroU16, NonZeroUsize},
183    };
184
185    fn test_key(key: &str) -> FixedBytes<64> {
186        let mut buf = [0u8; 64];
187        let key = key.as_bytes();
188        assert!(key.len() <= buf.len());
189        buf[..key.len()].copy_from_slice(key);
190        FixedBytes::decode(buf.as_ref()).unwrap()
191    }
192
193    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
194    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
195
196    async fn create_prunable(
197        context: Context,
198        compression: Option<u8>,
199    ) -> impl MultiArchive<Key = FixedBytes<64>, Value = i32> {
200        let cfg = prunable::Config {
201            translator: TwoCap,
202            key_partition: "test-key".into(),
203            key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
204            value_partition: "test-value".into(),
205            compression,
206            codec_config: (),
207            items_per_section: NZU64!(1024),
208            key_write_buffer: NZUsize!(1024),
209            value_write_buffer: NZUsize!(1024),
210            replay_buffer: NZUsize!(1024),
211        };
212        prunable::Archive::init(context, cfg).await.unwrap()
213    }
214
215    async fn create_immutable(
216        context: Context,
217        compression: Option<u8>,
218    ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
219        let cfg = immutable::Config {
220            metadata_partition: "test-metadata".into(),
221            freezer_table_partition: "test-table".into(),
222            freezer_table_initial_size: 64,
223            freezer_table_resize_frequency: 2,
224            freezer_table_resize_chunk_size: 32,
225            freezer_key_partition: "test-key".into(),
226            freezer_key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
227            freezer_value_partition: "test-value".into(),
228            freezer_value_target_size: 1024 * 1024,
229            freezer_value_compression: compression,
230            ordinal_partition: "test-ordinal".into(),
231            items_per_section: NZU64!(1024),
232            freezer_key_write_buffer: NZUsize!(1024 * 1024),
233            freezer_value_write_buffer: NZUsize!(1024 * 1024),
234            ordinal_write_buffer: NZUsize!(1024 * 1024),
235            replay_buffer: NZUsize!(1024 * 1024),
236            codec_config: (),
237        };
238        immutable::Archive::init(context, cfg).await.unwrap()
239    }
240
241    async fn test_put_get_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
242        let index = 1u64;
243        let key = test_key("testkey");
244        let data = 1;
245
246        // Has the key before put
247        let has = archive
248            .has(Identifier::Index(index))
249            .await
250            .expect("Failed to check key");
251        assert!(!has);
252        let has = archive
253            .has(Identifier::Key(&key))
254            .await
255            .expect("Failed to check key");
256        assert!(!has);
257
258        // Put the key-data pair
259        archive
260            .put(index, key.clone(), data)
261            .await
262            .expect("Failed to put data");
263
264        // Has the key after put
265        let has = archive
266            .has(Identifier::Index(index))
267            .await
268            .expect("Failed to check key");
269        assert!(has);
270        let has = archive
271            .has(Identifier::Key(&key))
272            .await
273            .expect("Failed to check key");
274        assert!(has);
275
276        // Get the data by key
277        let retrieved = archive
278            .get(Identifier::Key(&key))
279            .await
280            .expect("Failed to get data");
281        assert_eq!(retrieved, Some(data));
282
283        // Get the data by index
284        let retrieved = archive
285            .get(Identifier::Index(index))
286            .await
287            .expect("Failed to get data");
288        assert_eq!(retrieved, Some(data));
289
290        // Force a sync
291        archive.sync().await.expect("Failed to sync data");
292    }
293
294    #[test_traced]
295    fn test_put_get_prunable_no_compression() {
296        let executor = deterministic::Runner::default();
297        executor.start(|context| async move {
298            let archive = create_prunable(context, None).await;
299            test_put_get_impl(archive).await;
300        });
301    }
302
303    #[test_traced]
304    fn test_put_get_prunable_compression() {
305        let executor = deterministic::Runner::default();
306        executor.start(|context| async move {
307            let archive = create_prunable(context, Some(3)).await;
308            test_put_get_impl(archive).await;
309        });
310    }
311
312    #[test_traced]
313    fn test_put_get_immutable_no_compression() {
314        let executor = deterministic::Runner::default();
315        executor.start(|context| async move {
316            let archive = create_immutable(context, None).await;
317            test_put_get_impl(archive).await;
318        });
319    }
320
321    #[test_traced]
322    fn test_put_get_immutable_compression() {
323        let executor = deterministic::Runner::default();
324        executor.start(|context| async move {
325            let archive = create_immutable(context, Some(3)).await;
326            test_put_get_impl(archive).await;
327        });
328    }
329
330    async fn test_duplicate_key_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
331        let index = 1u64;
332        let key = test_key("duplicate");
333        let data1 = 1;
334        let data2 = 2;
335
336        // Put the key-data pair
337        archive
338            .put(index, key.clone(), data1)
339            .await
340            .expect("Failed to put data");
341
342        // Put the key-data pair again (should be idempotent)
343        archive
344            .put(index, key.clone(), data2)
345            .await
346            .expect("Duplicate put should not fail");
347
348        // Get the data back - should still be the first value
349        let retrieved = archive
350            .get(Identifier::Index(index))
351            .await
352            .expect("Failed to get data")
353            .expect("Data not found");
354        assert_eq!(retrieved, data1);
355
356        let retrieved = archive
357            .get(Identifier::Key(&key))
358            .await
359            .expect("Failed to get data")
360            .expect("Data not found");
361        assert_eq!(retrieved, data1);
362    }
363
364    #[test_traced]
365    fn test_duplicate_key_prunable_no_compression() {
366        let executor = deterministic::Runner::default();
367        executor.start(|context| async move {
368            let archive = create_prunable(context, None).await;
369            test_duplicate_key_impl(archive).await;
370        });
371    }
372
373    #[test_traced]
374    fn test_duplicate_key_prunable_compression() {
375        let executor = deterministic::Runner::default();
376        executor.start(|context| async move {
377            let archive = create_prunable(context, Some(3)).await;
378            test_duplicate_key_impl(archive).await;
379        });
380    }
381
382    #[test_traced]
383    fn test_duplicate_key_immutable_no_compression() {
384        let executor = deterministic::Runner::default();
385        executor.start(|context| async move {
386            let archive = create_immutable(context, None).await;
387            test_duplicate_key_impl(archive).await;
388        });
389    }
390
391    async fn test_duplicate_key_cross_index_impl(
392        mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>,
393    ) {
394        // Store the same key at two different indices; distinct values only so
395        // the test can observe which entry wins a key lookup.
396        let key = test_key("dupe-xindex");
397        archive.put(2, key.clone(), 20).await.expect("put(2)");
398        archive.put(5, key.clone(), 50).await.expect("put(5)");
399
400        // Both indices must resolve individually.
401        assert_eq!(
402            archive.get(Identifier::Index(2)).await.unwrap(),
403            Some(20),
404            "Index(2) must resolve to the value stored at 2"
405        );
406        assert_eq!(
407            archive.get(Identifier::Index(5)).await.unwrap(),
408            Some(50),
409            "Index(5) must resolve to the value stored at 5"
410        );
411
412        // Key lookup may return either value per the contract; just assert it
413        // returns one of them and that `has` reports presence.
414        let got = archive
415            .get(Identifier::Key(&key))
416            .await
417            .unwrap()
418            .expect("key lookup must find at least one entry");
419        assert!(got == 20 || got == 50, "unexpected value: {got}");
420        assert!(archive.has(Identifier::Key(&key)).await.unwrap());
421    }
422
423    #[test_traced]
424    fn test_duplicate_key_cross_index_prunable_no_compression() {
425        let executor = deterministic::Runner::default();
426        executor.start(|context| async move {
427            let archive = create_prunable(context, None).await;
428            test_duplicate_key_cross_index_impl(archive).await;
429        });
430    }
431
432    #[test_traced]
433    fn test_duplicate_key_cross_index_prunable_compression() {
434        let executor = deterministic::Runner::default();
435        executor.start(|context| async move {
436            let archive = create_prunable(context, Some(3)).await;
437            test_duplicate_key_cross_index_impl(archive).await;
438        });
439    }
440
441    #[test_traced]
442    fn test_duplicate_key_cross_index_immutable_no_compression() {
443        let executor = deterministic::Runner::default();
444        executor.start(|context| async move {
445            let archive = create_immutable(context, None).await;
446            test_duplicate_key_cross_index_impl(archive).await;
447        });
448    }
449
450    #[test_traced]
451    fn test_duplicate_key_cross_index_immutable_compression() {
452        let executor = deterministic::Runner::default();
453        executor.start(|context| async move {
454            let archive = create_immutable(context, Some(3)).await;
455            test_duplicate_key_cross_index_impl(archive).await;
456        });
457    }
458
459    #[test_traced]
460    fn test_duplicate_key_immutable_compression() {
461        let executor = deterministic::Runner::default();
462        executor.start(|context| async move {
463            let archive = create_immutable(context, Some(3)).await;
464            test_duplicate_key_impl(archive).await;
465        });
466    }
467
468    async fn test_get_nonexistent_impl(archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
469        // Attempt to get an index that doesn't exist
470        let index = 1u64;
471        let retrieved: Option<i32> = archive
472            .get(Identifier::Index(index))
473            .await
474            .expect("Failed to get data");
475        assert!(retrieved.is_none());
476
477        // Attempt to get a key that doesn't exist
478        let key = test_key("nonexistent");
479        let retrieved = archive
480            .get(Identifier::Key(&key))
481            .await
482            .expect("Failed to get data");
483        assert!(retrieved.is_none());
484    }
485
486    #[test_traced]
487    fn test_get_nonexistent_prunable_no_compression() {
488        let executor = deterministic::Runner::default();
489        executor.start(|context| async move {
490            let archive = create_prunable(context, None).await;
491            test_get_nonexistent_impl(archive).await;
492        });
493    }
494
495    #[test_traced]
496    fn test_get_nonexistent_prunable_compression() {
497        let executor = deterministic::Runner::default();
498        executor.start(|context| async move {
499            let archive = create_prunable(context, Some(3)).await;
500            test_get_nonexistent_impl(archive).await;
501        });
502    }
503
504    #[test_traced]
505    fn test_get_nonexistent_immutable_no_compression() {
506        let executor = deterministic::Runner::default();
507        executor.start(|context| async move {
508            let archive = create_immutable(context, None).await;
509            test_get_nonexistent_impl(archive).await;
510        });
511    }
512
513    #[test_traced]
514    fn test_get_nonexistent_immutable_compression() {
515        let executor = deterministic::Runner::default();
516        executor.start(|context| async move {
517            let archive = create_immutable(context, Some(3)).await;
518            test_get_nonexistent_impl(archive).await;
519        });
520    }
521
522    async fn test_persistence_impl<A, F, Fut>(context: Context, creator: F, compression: Option<u8>)
523    where
524        A: Archive<Key = FixedBytes<64>, Value = i32>,
525        F: Fn(Context, Option<u8>) -> Fut,
526        Fut: Future<Output = A>,
527    {
528        // Create and populate archive
529        {
530            let mut archive = creator(context.child("first"), compression).await;
531
532            // Insert multiple keys
533            let keys = vec![
534                (1u64, test_key("key1"), 1),
535                (2u64, test_key("key2"), 2),
536                (3u64, test_key("key3"), 3),
537            ];
538
539            for (index, key, data) in &keys {
540                archive
541                    .put(*index, key.clone(), *data)
542                    .await
543                    .expect("Failed to put data");
544            }
545
546            // Sync and drop the archive
547            archive.sync().await.expect("Failed to sync archive");
548        }
549
550        // Reopen and verify data
551        {
552            let archive = creator(context.child("second"), compression).await;
553
554            // Verify all keys are still present
555            let keys = vec![
556                (1u64, test_key("key1"), 1),
557                (2u64, test_key("key2"), 2),
558                (3u64, test_key("key3"), 3),
559            ];
560
561            for (index, key, expected_data) in &keys {
562                let retrieved = archive
563                    .get(Identifier::Index(*index))
564                    .await
565                    .expect("Failed to get data")
566                    .expect("Data not found");
567                assert_eq!(retrieved, *expected_data);
568
569                let retrieved = archive
570                    .get(Identifier::Key(key))
571                    .await
572                    .expect("Failed to get data")
573                    .expect("Data not found");
574                assert_eq!(retrieved, *expected_data);
575            }
576        }
577    }
578
579    #[test_traced]
580    fn test_persistence_prunable_no_compression() {
581        let executor = deterministic::Runner::default();
582        executor.start(|context| async move {
583            test_persistence_impl(context, create_prunable, None).await;
584        });
585    }
586
587    #[test_traced]
588    fn test_persistence_prunable_compression() {
589        let executor = deterministic::Runner::default();
590        executor.start(|context| async move {
591            test_persistence_impl(context, create_prunable, Some(3)).await;
592        });
593    }
594
595    #[test_traced]
596    fn test_persistence_immutable_no_compression() {
597        let executor = deterministic::Runner::default();
598        executor.start(|context| async move {
599            test_persistence_impl(context, create_immutable, None).await;
600        });
601    }
602
603    #[test_traced]
604    fn test_persistence_immutable_compression() {
605        let executor = deterministic::Runner::default();
606        executor.start(|context| async move {
607            test_persistence_impl(context, create_immutable, Some(3)).await;
608        });
609    }
610
611    async fn test_ranges_impl<A, F, Fut>(mut context: Context, creator: F, compression: Option<u8>)
612    where
613        A: Archive<Key = FixedBytes<64>, Value = i32>,
614        F: Fn(Context, Option<u8>) -> Fut,
615        Fut: Future<Output = A>,
616    {
617        let mut keys = BTreeMap::new();
618        {
619            let mut archive = creator(context.child("first"), compression).await;
620
621            // Insert 100 keys with gaps
622            let mut last_index = 0u64;
623            while keys.len() < 100 {
624                let gap: u64 = context.gen_range(1..=10);
625                let index = last_index + gap;
626                last_index = index;
627
628                let mut key_bytes = [0u8; 64];
629                context.fill(&mut key_bytes);
630                let key = FixedBytes::<64>::decode(key_bytes.as_ref()).unwrap();
631                let data: i32 = context.gen();
632
633                if keys.contains_key(&index) {
634                    continue;
635                }
636                keys.insert(index, (key.clone(), data));
637
638                archive
639                    .put(index, key, data)
640                    .await
641                    .expect("Failed to put data");
642            }
643
644            archive.sync().await.expect("Failed to sync archive");
645        }
646
647        {
648            let archive = creator(context.child("second"), compression).await;
649            let sorted_indices: Vec<u64> = keys.keys().cloned().collect();
650
651            // Check gap before the first element
652            let (current_end, start_next) = archive.next_gap(0);
653            assert!(current_end.is_none());
654            assert_eq!(start_next, Some(sorted_indices[0]));
655
656            // Check gaps between elements
657            let mut i = 0;
658            while i < sorted_indices.len() {
659                let current_index = sorted_indices[i];
660
661                // Find the end of the current contiguous block
662                let mut j = i;
663                while j + 1 < sorted_indices.len() && sorted_indices[j + 1] == sorted_indices[j] + 1
664                {
665                    j += 1;
666                }
667                let block_end_index = sorted_indices[j];
668                let next_actual_index = if j + 1 < sorted_indices.len() {
669                    Some(sorted_indices[j + 1])
670                } else {
671                    None
672                };
673
674                let (current_end, start_next) = archive.next_gap(current_index);
675                assert_eq!(current_end, Some(block_end_index));
676                assert_eq!(start_next, next_actual_index);
677
678                // If there's a gap, check an index within the gap
679                if let Some(next_index) = next_actual_index {
680                    if next_index > block_end_index + 1 {
681                        let in_gap_index = block_end_index + 1;
682                        let (current_end, start_next) = archive.next_gap(in_gap_index);
683                        assert!(current_end.is_none());
684                        assert_eq!(start_next, Some(next_index));
685                    }
686                }
687                i = j + 1;
688            }
689
690            // Check the last element
691            let last_index = *sorted_indices.last().unwrap();
692            let (current_end, start_next) = archive.next_gap(last_index);
693            assert!(current_end.is_some());
694            assert!(start_next.is_none());
695        }
696    }
697
698    #[test_traced]
699    fn test_ranges_prunable_no_compression() {
700        let executor = deterministic::Runner::default();
701        executor.start(|context| async move {
702            test_ranges_impl(context, create_prunable, None).await;
703        });
704    }
705
706    #[test_traced]
707    fn test_ranges_prunable_compression() {
708        let executor = deterministic::Runner::default();
709        executor.start(|context| async move {
710            test_ranges_impl(context, create_prunable, Some(3)).await;
711        });
712    }
713
714    #[test_traced]
715    fn test_ranges_immutable_no_compression() {
716        let executor = deterministic::Runner::default();
717        executor.start(|context| async move {
718            test_ranges_impl(context, create_immutable, None).await;
719        });
720    }
721
722    #[test_traced]
723    fn test_ranges_immutable_compression() {
724        let executor = deterministic::Runner::default();
725        executor.start(|context| async move {
726            test_ranges_impl(context, create_immutable, Some(3)).await;
727        });
728    }
729
730    async fn test_many_keys_impl<A, F, Fut>(
731        mut context: Context,
732        creator: F,
733        compression: Option<u8>,
734        num: usize,
735    ) where
736        A: Archive<Key = FixedBytes<64>, Value = i32>,
737        F: Fn(Context, Option<u8>) -> Fut,
738        Fut: Future<Output = A>,
739    {
740        // Insert many keys
741        let mut keys = BTreeMap::new();
742        {
743            let mut archive = creator(context.child("first"), compression).await;
744            while keys.len() < num {
745                let index = keys.len() as u64;
746                let mut key = [0u8; 64];
747                context.fill(&mut key);
748                let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
749                let data: i32 = context.gen();
750
751                archive
752                    .put(index, key.clone(), data)
753                    .await
754                    .expect("Failed to put data");
755                keys.insert(key, (index, data));
756
757                // Randomly sync the archive
758                if context.gen_bool(0.1) {
759                    archive.sync().await.expect("Failed to sync archive");
760                }
761            }
762            archive.sync().await.expect("Failed to sync archive");
763
764            // Ensure all keys can be retrieved
765            for (key, (index, data)) in &keys {
766                let retrieved = archive
767                    .get(Identifier::Index(*index))
768                    .await
769                    .expect("Failed to get data")
770                    .expect("Data not found");
771                assert_eq!(&retrieved, data);
772                let retrieved = archive
773                    .get(Identifier::Key(key))
774                    .await
775                    .expect("Failed to get data")
776                    .expect("Data not found");
777                assert_eq!(&retrieved, data);
778            }
779        }
780
781        // Reinitialize and verify
782        {
783            let archive = creator(context.child("second"), compression).await;
784
785            // Ensure all keys can be retrieved
786            for (key, (index, data)) in &keys {
787                let retrieved = archive
788                    .get(Identifier::Index(*index))
789                    .await
790                    .expect("Failed to get data")
791                    .expect("Data not found");
792                assert_eq!(&retrieved, data);
793                let retrieved = archive
794                    .get(Identifier::Key(key))
795                    .await
796                    .expect("Failed to get data")
797                    .expect("Data not found");
798                assert_eq!(&retrieved, data);
799            }
800        }
801    }
802
803    fn test_many_keys_determinism<F, Fut, A>(creator: F, compression: Option<u8>, num: usize)
804    where
805        A: Archive<Key = FixedBytes<64>, Value = i32>,
806        F: Fn(Context, Option<u8>) -> Fut + Copy + Send + 'static,
807        Fut: Future<Output = A> + Send,
808    {
809        let executor = deterministic::Runner::default();
810        let state1 = executor.start(|context| async move {
811            test_many_keys_impl(context.child("storage"), creator, compression, num).await;
812            context.auditor().state()
813        });
814        let executor = deterministic::Runner::default();
815        let state2 = executor.start(|context| async move {
816            test_many_keys_impl(context.child("storage"), creator, compression, num).await;
817            context.auditor().state()
818        });
819        assert_eq!(state1, state2);
820    }
821
822    #[test_traced]
823    fn test_many_keys_prunable_no_compression() {
824        test_many_keys_determinism(create_prunable, None, 1_000);
825    }
826
827    #[test_traced]
828    fn test_many_keys_prunable_compression() {
829        test_many_keys_determinism(create_prunable, Some(3), 1_000);
830    }
831
832    #[test_traced]
833    fn test_many_keys_immutable_no_compression() {
834        test_many_keys_determinism(create_immutable, None, 1_000);
835    }
836
837    #[test_traced]
838    fn test_many_keys_immutable_compression() {
839        test_many_keys_determinism(create_immutable, Some(3), 1_000);
840    }
841
842    #[test_group("slow")]
843    #[test_traced]
844    fn test_many_keys_prunable_large() {
845        test_many_keys_determinism(create_prunable, None, 50_000);
846    }
847
848    #[test_group("slow")]
849    #[test_traced]
850    fn test_many_keys_immutable_large() {
851        test_many_keys_determinism(create_immutable, None, 50_000);
852    }
853
854    async fn test_put_multi_and_get_impl(
855        context: Context,
856        mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
857    ) {
858        // Put three items at the same index with different keys
859        let index = 5u64;
860        let key_a = test_key("aaa");
861        let key_b = test_key("bbb");
862        let key_c = test_key("ccc");
863
864        archive
865            .put_multi(index, key_a.clone(), 10)
866            .await
867            .expect("put_multi a");
868        archive
869            .put_multi(index, key_b.clone(), 20)
870            .await
871            .expect("put_multi b");
872        archive
873            .put_multi(index, key_c.clone(), 30)
874            .await
875            .expect("put_multi c");
876
877        // Retrieve each by key
878        assert_eq!(
879            archive.get(Identifier::Key(&key_a)).await.unwrap(),
880            Some(10)
881        );
882        assert_eq!(
883            archive.get(Identifier::Key(&key_b)).await.unwrap(),
884            Some(20)
885        );
886        assert_eq!(
887            archive.get(Identifier::Key(&key_c)).await.unwrap(),
888            Some(30)
889        );
890
891        // Missing key returns None
892        let missing = test_key("zzz");
893        assert_eq!(archive.get(Identifier::Key(&missing)).await.unwrap(), None);
894
895        // items_tracked reflects unique indices, not total items
896        let buffer = context.encode();
897        assert!(has_metric_value(&buffer, "items_tracked", 1));
898    }
899
900    #[test_traced]
901    fn test_put_multi_and_get_prunable() {
902        let executor = deterministic::Runner::default();
903        executor.start(|context| async move {
904            let archive = create_prunable(context.child("storage"), None).await;
905            test_put_multi_and_get_impl(context, archive).await;
906        });
907    }
908
909    async fn test_put_multi_duplicate_key_impl(
910        context: Context,
911        mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
912    ) {
913        let key = test_key("dup");
914        archive.put_multi(5, key.clone(), 10).await.unwrap();
915        archive.put_multi(7, key.clone(), 20).await.unwrap();
916
917        // Duplicate key is allowed across indices.
918        assert_eq!(archive.get(Identifier::Index(5)).await.unwrap(), Some(10));
919        assert_eq!(archive.get(Identifier::Index(7)).await.unwrap(), Some(20));
920        assert_eq!(archive.get_all(5).await.unwrap(), Some(vec![10]));
921        assert_eq!(archive.get_all(7).await.unwrap(), Some(vec![20]));
922
923        // Like Archive::put, duplicate keys may return any associated value.
924        assert!(matches!(
925            archive.get(Identifier::Key(&key)).await.unwrap(),
926            Some(10 | 20)
927        ));
928
929        let buffer = context.encode();
930        assert!(has_metric_value(&buffer, "items_tracked", 2));
931    }
932
933    #[test_traced]
934    fn test_put_multi_duplicate_key_prunable() {
935        let executor = deterministic::Runner::default();
936        executor.start(|context| async move {
937            let archive = create_prunable(context.child("storage"), None).await;
938            test_put_multi_duplicate_key_impl(context, archive).await;
939        });
940    }
941
942    async fn test_get_all_impl(mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>) {
943        // Three items at the same index
944        archive.put_multi(5, test_key("aaa"), 10).await.unwrap();
945        archive.put_multi(5, test_key("bbb"), 20).await.unwrap();
946        archive.put_multi(5, test_key("ccc"), 30).await.unwrap();
947
948        // One item at a different index
949        archive.put_multi(7, test_key("ddd"), 40).await.unwrap();
950
951        // get_all returns all values at the index in insertion order
952        let all = archive.get_all(5).await.unwrap();
953        assert_eq!(all, Some(vec![10, 20, 30]));
954
955        // Single-item index returns one element
956        let all = archive.get_all(7).await.unwrap();
957        assert_eq!(all, Some(vec![40]));
958
959        // Missing index returns None
960        let all = archive.get_all(99).await.unwrap();
961        assert_eq!(all, None);
962
963        // Archive::get(Index) still returns only the first
964        assert_eq!(archive.get(Identifier::Index(5)).await.unwrap(), Some(10));
965    }
966
967    #[test_traced]
968    fn test_get_all_prunable() {
969        let executor = deterministic::Runner::default();
970        executor.start(|context| async move {
971            let archive = create_prunable(context, None).await;
972            test_get_all_impl(archive).await;
973        });
974    }
975
976    async fn test_put_multi_preserves_archive_put_semantics_impl(
977        mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
978    ) {
979        // put_multi two items at the same index
980        archive
981            .put_multi(1, test_key("aaa"), 10)
982            .await
983            .expect("put_multi");
984        archive
985            .put_multi(1, test_key("bbb"), 20)
986            .await
987            .expect("put_multi");
988
989        // Archive::put is a no-op when index already exists
990        archive
991            .put(1, test_key("ccc"), 30)
992            .await
993            .expect("Archive::put should no-op");
994
995        // Only two items exist (Archive::put did not add a third)
996        assert_eq!(
997            archive
998                .get(Identifier::Key(&test_key("aaa")))
999                .await
1000                .unwrap(),
1001            Some(10)
1002        );
1003        assert_eq!(
1004            archive
1005                .get(Identifier::Key(&test_key("bbb")))
1006                .await
1007                .unwrap(),
1008            Some(20)
1009        );
1010        assert_eq!(
1011            archive
1012                .get(Identifier::Key(&test_key("ccc")))
1013                .await
1014                .unwrap(),
1015            None
1016        );
1017
1018        // Archive::get(Index) returns the first item inserted
1019        let first = archive
1020            .get(Identifier::Index(1))
1021            .await
1022            .unwrap()
1023            .expect("should find first");
1024        assert_eq!(first, 10);
1025    }
1026
1027    #[test_traced]
1028    fn test_put_multi_preserves_archive_put_semantics_prunable() {
1029        let executor = deterministic::Runner::default();
1030        executor.start(|context| async move {
1031            let archive = create_prunable(context, None).await;
1032            test_put_multi_preserves_archive_put_semantics_impl(archive).await;
1033        });
1034    }
1035
1036    async fn test_put_multi_restart_impl<A, F, Fut>(
1037        context: Context,
1038        creator: F,
1039        compression: Option<u8>,
1040    ) where
1041        A: MultiArchive<Key = FixedBytes<64>, Value = i32>,
1042        F: Fn(Context, Option<u8>) -> Fut,
1043        Fut: Future<Output = A>,
1044    {
1045        // Write multi-items, sync, and drop
1046        {
1047            let mut archive = creator(
1048                context.child("init").with_attribute("index", 1),
1049                compression,
1050            )
1051            .await;
1052            archive.put_multi(5, test_key("aaa"), 10).await.unwrap();
1053            archive.put_multi(5, test_key("bbb"), 20).await.unwrap();
1054            archive.put_multi(7, test_key("ccc"), 30).await.unwrap();
1055            archive.sync().await.unwrap();
1056        }
1057
1058        // Reinitialize and verify
1059        let archive = creator(
1060            context.child("init").with_attribute("index", 2),
1061            compression,
1062        )
1063        .await;
1064
1065        assert_eq!(
1066            archive
1067                .get(Identifier::Key(&test_key("aaa")))
1068                .await
1069                .unwrap(),
1070            Some(10)
1071        );
1072        assert_eq!(
1073            archive
1074                .get(Identifier::Key(&test_key("bbb")))
1075                .await
1076                .unwrap(),
1077            Some(20)
1078        );
1079        assert_eq!(
1080            archive
1081                .get(Identifier::Key(&test_key("ccc")))
1082                .await
1083                .unwrap(),
1084            Some(30)
1085        );
1086
1087        // items_tracked reflects two unique indices after restart
1088        let buffer = context.encode();
1089        assert!(has_metric_value(&buffer, "items_tracked", 2));
1090    }
1091
1092    #[test_traced]
1093    fn test_put_multi_restart_prunable() {
1094        let executor = deterministic::Runner::default();
1095        executor.start(|context| async move {
1096            test_put_multi_restart_impl(context, create_prunable, None).await;
1097        });
1098    }
1099
1100    async fn test_put_multi_mixed_indices_impl(
1101        context: Context,
1102        mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
1103    ) {
1104        // Mix Archive::put (single-item) and MultiArchive::put_multi
1105        archive.put(1, test_key("single"), 100).await.unwrap();
1106        archive
1107            .put_multi(2, test_key("multi-a"), 200)
1108            .await
1109            .unwrap();
1110        archive
1111            .put_multi(2, test_key("multi-b"), 201)
1112            .await
1113            .unwrap();
1114        archive
1115            .put_multi(3, test_key("multi-c"), 300)
1116            .await
1117            .unwrap();
1118
1119        // All retrievable by key
1120        assert_eq!(
1121            archive
1122                .get(Identifier::Key(&test_key("single")))
1123                .await
1124                .unwrap(),
1125            Some(100)
1126        );
1127        assert_eq!(
1128            archive
1129                .get(Identifier::Key(&test_key("multi-a")))
1130                .await
1131                .unwrap(),
1132            Some(200)
1133        );
1134        assert_eq!(
1135            archive
1136                .get(Identifier::Key(&test_key("multi-b")))
1137                .await
1138                .unwrap(),
1139            Some(201)
1140        );
1141        assert_eq!(
1142            archive
1143                .get(Identifier::Key(&test_key("multi-c")))
1144                .await
1145                .unwrap(),
1146            Some(300)
1147        );
1148
1149        // Archive::get(Index) returns first item at that index
1150        assert_eq!(archive.get(Identifier::Index(2)).await.unwrap(), Some(200));
1151
1152        // Gap tracking works across mixed usage
1153        let (end, next) = archive.next_gap(1);
1154        assert_eq!(end, Some(3));
1155        assert!(next.is_none());
1156
1157        let buffer = context.encode();
1158        assert!(has_metric_value(&buffer, "items_tracked", 3));
1159    }
1160
1161    #[test_traced]
1162    fn test_put_multi_mixed_indices_prunable() {
1163        let executor = deterministic::Runner::default();
1164        executor.start(|context| async move {
1165            let archive = create_prunable(context.child("storage"), None).await;
1166            test_put_multi_mixed_indices_impl(context, archive).await;
1167        });
1168    }
1169
1170    fn assert_send<T: Send>(_: T) {}
1171
1172    #[allow(dead_code)]
1173    fn assert_archive_futures_are_send<T: super::Archive>(
1174        archive: &mut T,
1175        key: T::Key,
1176        value: T::Value,
1177    ) where
1178        T::Key: Clone,
1179        T::Value: Clone,
1180    {
1181        assert_send(archive.put(1, key.clone(), value.clone()));
1182        assert_send(archive.put_sync(2, key.clone(), value));
1183        assert_send(archive.get(Identifier::Index(1)));
1184        assert_send(archive.get(Identifier::Key(&key)));
1185        assert_send(archive.has(Identifier::Index(1)));
1186        assert_send(archive.has(Identifier::Key(&key)));
1187        assert_send(archive.sync());
1188    }
1189
1190    #[allow(dead_code)]
1191    fn assert_archive_destroy_is_send<T: super::Archive>(archive: T) {
1192        assert_send(archive.destroy());
1193    }
1194
1195    #[allow(dead_code)]
1196    fn assert_multi_archive_futures_are_send<T: super::MultiArchive>(
1197        archive: &mut T,
1198        key: T::Key,
1199        value: T::Value,
1200    ) where
1201        T::Key: Clone,
1202        T::Value: Clone,
1203    {
1204        assert_archive_futures_are_send(archive, key.clone(), value.clone());
1205        assert_send(archive.get_all(1));
1206        assert_send(archive.put_multi(1, key.clone(), value.clone()));
1207        assert_send(archive.put_multi_sync(2, key, value));
1208    }
1209
1210    #[allow(dead_code)]
1211    fn assert_prunable_archive_futures_are_send(
1212        archive: &mut prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1213        key: FixedBytes<64>,
1214        value: i32,
1215    ) {
1216        assert_archive_futures_are_send(archive, key, value);
1217    }
1218
1219    #[allow(dead_code)]
1220    fn assert_prunable_multi_archive_futures_are_send(
1221        archive: &mut prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1222        key: FixedBytes<64>,
1223        value: i32,
1224    ) {
1225        assert_multi_archive_futures_are_send(archive, key, value);
1226    }
1227
1228    #[allow(dead_code)]
1229    fn assert_prunable_archive_destroy_is_send(
1230        archive: prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1231    ) {
1232        assert_archive_destroy_is_send(archive);
1233    }
1234
1235    #[allow(dead_code)]
1236    fn assert_immutable_archive_futures_are_send(
1237        archive: &mut immutable::Archive<Context, FixedBytes<64>, i32>,
1238        key: FixedBytes<64>,
1239        value: i32,
1240    ) {
1241        assert_archive_futures_are_send(archive, key, value);
1242    }
1243
1244    #[allow(dead_code)]
1245    fn assert_immutable_archive_destroy_is_send(
1246        archive: immutable::Archive<Context, FixedBytes<64>, i32>,
1247    ) {
1248        assert_archive_destroy_is_send(archive);
1249    }
1250}