Skip to main content

commonware_storage/archive/
mod.rs

1//! A write-once key-value store for ordered data.
2//!
3//! [Archive] is a key-value store designed for workloads where all data is written only once and is
4//! uniquely associated with both an `index` and a `key`.
5
6use commonware_codec::Codec;
7use commonware_utils::Array;
8use std::future::Future;
9use thiserror::Error;
10
11pub mod immutable;
12pub mod prunable;
13
14#[cfg(all(test, feature = "arbitrary"))]
15mod conformance;
16
17/// Subject of a `get` or `has` operation.
18pub enum Identifier<'a, K: Array> {
19    Index(u64),
20    Key(&'a K),
21}
22
23/// Errors that can occur when interacting with the archive.
24#[derive(Debug, Error)]
25pub enum Error {
26    #[error("journal error: {0}")]
27    Journal(#[from] crate::journal::Error),
28    #[error("ordinal error: {0}")]
29    Ordinal(#[from] crate::ordinal::Error),
30    #[error("metadata error: {0}")]
31    Metadata(#[from] crate::metadata::Error),
32    #[error("freezer error: {0}")]
33    Freezer(#[from] crate::freezer::Error),
34    #[error("record corrupted")]
35    RecordCorrupted,
36    #[error("already pruned to: {0}")]
37    AlreadyPrunedTo(u64),
38    #[error("record too large")]
39    RecordTooLarge,
40}
41
42/// A write-once key-value store where each key is associated with a unique index.
43pub trait Archive: Send {
44    /// The type of the key.
45    type Key: Array;
46
47    /// The type of the value.
48    type Value: Codec + Send;
49
50    /// Store an item in [Archive]. Both indices and keys are assumed to both be globally unique.
51    ///
52    /// If the index already exists, put does nothing and returns. If the same key is stored multiple times
53    /// at different indices (not recommended), any value associated with the key may be returned.
54    fn put(
55        &mut self,
56        index: u64,
57        key: Self::Key,
58        value: Self::Value,
59    ) -> impl Future<Output = Result<(), Error>> + Send;
60
61    /// Perform a [Archive::put] and [Archive::sync] in a single operation.
62    fn put_sync(
63        &mut self,
64        index: u64,
65        key: Self::Key,
66        value: Self::Value,
67    ) -> impl Future<Output = Result<(), Error>> + Send {
68        async move {
69            self.put(index, key, value).await?;
70            self.sync().await
71        }
72    }
73
74    /// Retrieve an item from [Archive].
75    ///
76    /// Note that if the [Archive] is a [MultiArchive], there may be multiple values associated with the
77    /// same [Identifier::Index]. If there are multiple values, the first stored will be returned. Use
78    /// [MultiArchive::get_all] to retrieve all values at an index.
79    fn get<'a>(
80        &'a self,
81        identifier: Identifier<'a, Self::Key>,
82    ) -> impl Future<Output = Result<Option<Self::Value>, Error>> + Send + use<'a, Self>;
83
84    /// Check if an item exists in [Archive].
85    fn has<'a>(
86        &'a self,
87        identifier: Identifier<'a, Self::Key>,
88    ) -> impl Future<Output = Result<bool, Error>> + Send + use<'a, Self>;
89
90    /// Retrieve the end of the current range including `index` (inclusive) and
91    /// the start of the next range after `index` (if it exists).
92    ///
93    /// This is useful for driving backfill operations over the archive.
94    fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>);
95
96    /// Returns up to `max` missing items starting from `start`.
97    ///
98    /// This method iterates through gaps between existing ranges, collecting missing indices
99    /// until either `max` items are found or there are no more gaps to fill.
100    fn missing_items(&self, index: u64, max: usize) -> Vec<u64>;
101
102    /// Retrieve an iterator over all populated ranges (inclusive) within the [Archive].
103    fn ranges(&self) -> impl Iterator<Item = (u64, u64)>;
104
105    /// Retrieve an iterator over ranges that overlap or follow `from`.
106    fn ranges_from(&self, from: u64) -> impl Iterator<Item = (u64, u64)>;
107
108    /// Retrieve the first index in the [Archive].
109    fn first_index(&self) -> Option<u64>;
110
111    /// Retrieve the last index in the [Archive].
112    fn last_index(&self) -> Option<u64>;
113
114    /// Sync all pending writes.
115    fn sync(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
116
117    /// Remove all persistent data created by this [Archive].
118    fn destroy(self) -> impl Future<Output = Result<(), Error>> + Send;
119}
120
121/// Extension of [Archive] that supports multiple items at the same index.
122///
123/// Unlike [Archive::put], which is a no-op when the index already exists,
124/// [MultiArchive::put_multi] allows storing additional `(key, value)` pairs
125/// at an existing index. As with [Archive::put], keys are assumed to be globally
126/// unique, but duplicate keys are not rejected.
127pub trait MultiArchive: Archive {
128    /// Retrieve all values stored at the given index.
129    ///
130    /// Returns `None` if the index does not exist or has been pruned.
131    fn get_all(
132        &self,
133        index: u64,
134    ) -> impl Future<Output = Result<Option<Vec<Self::Value>>, Error>> + Send + use<'_, Self>;
135
136    /// Store an item, allowing multiple items at the same index.
137    ///
138    /// Multiple items may share the same `index`. If the same key is stored at
139    /// multiple indices, any associated value may be returned when queried with
140    /// [Identifier::Key].
141    fn put_multi(
142        &mut self,
143        index: u64,
144        key: Self::Key,
145        value: Self::Value,
146    ) -> impl Future<Output = Result<(), Error>> + Send;
147
148    /// Perform a [MultiArchive::put_multi] and [Archive::sync] in a single operation.
149    fn put_multi_sync(
150        &mut self,
151        index: u64,
152        key: Self::Key,
153        value: Self::Value,
154    ) -> impl Future<Output = Result<(), Error>> + Send {
155        async move {
156            self.put_multi(index, key, value).await?;
157            self.sync().await
158        }
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use crate::translator::TwoCap;
166    use commonware_codec::DecodeExt;
167    use commonware_macros::{test_group, test_traced};
168    use commonware_runtime::{
169        buffer::paged::CacheRef,
170        deterministic::{self, Context},
171        Metrics, Runner,
172    };
173    use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
174
175    fn test_key(key: &str) -> FixedBytes<64> {
176        let mut buf = [0u8; 64];
177        let key = key.as_bytes();
178        assert!(key.len() <= buf.len());
179        buf[..key.len()].copy_from_slice(key);
180        FixedBytes::decode(buf.as_ref()).unwrap()
181    }
182    use rand::Rng;
183    use std::{
184        collections::BTreeMap,
185        num::{NonZeroU16, NonZeroUsize},
186    };
187
188    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
189    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
190
191    async fn create_prunable(
192        context: Context,
193        compression: Option<u8>,
194    ) -> impl MultiArchive<Key = FixedBytes<64>, Value = i32> {
195        let cfg = prunable::Config {
196            translator: TwoCap,
197            key_partition: "test-key".into(),
198            key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
199            value_partition: "test-value".into(),
200            compression,
201            codec_config: (),
202            items_per_section: NZU64!(1024),
203            key_write_buffer: NZUsize!(1024),
204            value_write_buffer: NZUsize!(1024),
205            replay_buffer: NZUsize!(1024),
206        };
207        prunable::Archive::init(context, cfg).await.unwrap()
208    }
209
210    async fn create_immutable(
211        context: Context,
212        compression: Option<u8>,
213    ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
214        let cfg = immutable::Config {
215            metadata_partition: "test-metadata".into(),
216            freezer_table_partition: "test-table".into(),
217            freezer_table_initial_size: 64,
218            freezer_table_resize_frequency: 2,
219            freezer_table_resize_chunk_size: 32,
220            freezer_key_partition: "test-key".into(),
221            freezer_key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
222            freezer_value_partition: "test-value".into(),
223            freezer_value_target_size: 1024 * 1024,
224            freezer_value_compression: compression,
225            ordinal_partition: "test-ordinal".into(),
226            items_per_section: NZU64!(1024),
227            freezer_key_write_buffer: NZUsize!(1024 * 1024),
228            freezer_value_write_buffer: NZUsize!(1024 * 1024),
229            ordinal_write_buffer: NZUsize!(1024 * 1024),
230            replay_buffer: NZUsize!(1024 * 1024),
231            codec_config: (),
232        };
233        immutable::Archive::init(context, cfg).await.unwrap()
234    }
235
236    async fn test_put_get_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
237        let index = 1u64;
238        let key = test_key("testkey");
239        let data = 1;
240
241        // Has the key before put
242        let has = archive
243            .has(Identifier::Index(index))
244            .await
245            .expect("Failed to check key");
246        assert!(!has);
247        let has = archive
248            .has(Identifier::Key(&key))
249            .await
250            .expect("Failed to check key");
251        assert!(!has);
252
253        // Put the key-data pair
254        archive
255            .put(index, key.clone(), data)
256            .await
257            .expect("Failed to put data");
258
259        // Has the key after put
260        let has = archive
261            .has(Identifier::Index(index))
262            .await
263            .expect("Failed to check key");
264        assert!(has);
265        let has = archive
266            .has(Identifier::Key(&key))
267            .await
268            .expect("Failed to check key");
269        assert!(has);
270
271        // Get the data by key
272        let retrieved = archive
273            .get(Identifier::Key(&key))
274            .await
275            .expect("Failed to get data");
276        assert_eq!(retrieved, Some(data));
277
278        // Get the data by index
279        let retrieved = archive
280            .get(Identifier::Index(index))
281            .await
282            .expect("Failed to get data");
283        assert_eq!(retrieved, Some(data));
284
285        // Force a sync
286        archive.sync().await.expect("Failed to sync data");
287    }
288
289    #[test_traced]
290    fn test_put_get_prunable_no_compression() {
291        let executor = deterministic::Runner::default();
292        executor.start(|context| async move {
293            let archive = create_prunable(context, None).await;
294            test_put_get_impl(archive).await;
295        });
296    }
297
298    #[test_traced]
299    fn test_put_get_prunable_compression() {
300        let executor = deterministic::Runner::default();
301        executor.start(|context| async move {
302            let archive = create_prunable(context, Some(3)).await;
303            test_put_get_impl(archive).await;
304        });
305    }
306
307    #[test_traced]
308    fn test_put_get_immutable_no_compression() {
309        let executor = deterministic::Runner::default();
310        executor.start(|context| async move {
311            let archive = create_immutable(context, None).await;
312            test_put_get_impl(archive).await;
313        });
314    }
315
316    #[test_traced]
317    fn test_put_get_immutable_compression() {
318        let executor = deterministic::Runner::default();
319        executor.start(|context| async move {
320            let archive = create_immutable(context, Some(3)).await;
321            test_put_get_impl(archive).await;
322        });
323    }
324
325    async fn test_duplicate_key_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
326        let index = 1u64;
327        let key = test_key("duplicate");
328        let data1 = 1;
329        let data2 = 2;
330
331        // Put the key-data pair
332        archive
333            .put(index, key.clone(), data1)
334            .await
335            .expect("Failed to put data");
336
337        // Put the key-data pair again (should be idempotent)
338        archive
339            .put(index, key.clone(), data2)
340            .await
341            .expect("Duplicate put should not fail");
342
343        // Get the data back - should still be the first value
344        let retrieved = archive
345            .get(Identifier::Index(index))
346            .await
347            .expect("Failed to get data")
348            .expect("Data not found");
349        assert_eq!(retrieved, data1);
350
351        let retrieved = archive
352            .get(Identifier::Key(&key))
353            .await
354            .expect("Failed to get data")
355            .expect("Data not found");
356        assert_eq!(retrieved, data1);
357    }
358
359    #[test_traced]
360    fn test_duplicate_key_prunable_no_compression() {
361        let executor = deterministic::Runner::default();
362        executor.start(|context| async move {
363            let archive = create_prunable(context, None).await;
364            test_duplicate_key_impl(archive).await;
365        });
366    }
367
368    #[test_traced]
369    fn test_duplicate_key_prunable_compression() {
370        let executor = deterministic::Runner::default();
371        executor.start(|context| async move {
372            let archive = create_prunable(context, Some(3)).await;
373            test_duplicate_key_impl(archive).await;
374        });
375    }
376
377    #[test_traced]
378    fn test_duplicate_key_immutable_no_compression() {
379        let executor = deterministic::Runner::default();
380        executor.start(|context| async move {
381            let archive = create_immutable(context, None).await;
382            test_duplicate_key_impl(archive).await;
383        });
384    }
385
386    #[test_traced]
387    fn test_duplicate_key_immutable_compression() {
388        let executor = deterministic::Runner::default();
389        executor.start(|context| async move {
390            let archive = create_immutable(context, Some(3)).await;
391            test_duplicate_key_impl(archive).await;
392        });
393    }
394
395    async fn test_get_nonexistent_impl(archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
396        // Attempt to get an index that doesn't exist
397        let index = 1u64;
398        let retrieved: Option<i32> = archive
399            .get(Identifier::Index(index))
400            .await
401            .expect("Failed to get data");
402        assert!(retrieved.is_none());
403
404        // Attempt to get a key that doesn't exist
405        let key = test_key("nonexistent");
406        let retrieved = archive
407            .get(Identifier::Key(&key))
408            .await
409            .expect("Failed to get data");
410        assert!(retrieved.is_none());
411    }
412
413    #[test_traced]
414    fn test_get_nonexistent_prunable_no_compression() {
415        let executor = deterministic::Runner::default();
416        executor.start(|context| async move {
417            let archive = create_prunable(context, None).await;
418            test_get_nonexistent_impl(archive).await;
419        });
420    }
421
422    #[test_traced]
423    fn test_get_nonexistent_prunable_compression() {
424        let executor = deterministic::Runner::default();
425        executor.start(|context| async move {
426            let archive = create_prunable(context, Some(3)).await;
427            test_get_nonexistent_impl(archive).await;
428        });
429    }
430
431    #[test_traced]
432    fn test_get_nonexistent_immutable_no_compression() {
433        let executor = deterministic::Runner::default();
434        executor.start(|context| async move {
435            let archive = create_immutable(context, None).await;
436            test_get_nonexistent_impl(archive).await;
437        });
438    }
439
440    #[test_traced]
441    fn test_get_nonexistent_immutable_compression() {
442        let executor = deterministic::Runner::default();
443        executor.start(|context| async move {
444            let archive = create_immutable(context, Some(3)).await;
445            test_get_nonexistent_impl(archive).await;
446        });
447    }
448
449    async fn test_persistence_impl<A, F, Fut>(context: Context, creator: F, compression: Option<u8>)
450    where
451        A: Archive<Key = FixedBytes<64>, Value = i32>,
452        F: Fn(Context, Option<u8>) -> Fut,
453        Fut: Future<Output = A>,
454    {
455        // Create and populate archive
456        {
457            let mut archive = creator(context.with_label("first"), compression).await;
458
459            // Insert multiple keys
460            let keys = vec![
461                (1u64, test_key("key1"), 1),
462                (2u64, test_key("key2"), 2),
463                (3u64, test_key("key3"), 3),
464            ];
465
466            for (index, key, data) in &keys {
467                archive
468                    .put(*index, key.clone(), *data)
469                    .await
470                    .expect("Failed to put data");
471            }
472
473            // Sync and drop the archive
474            archive.sync().await.expect("Failed to sync archive");
475        }
476
477        // Reopen and verify data
478        {
479            let archive = creator(context.with_label("second"), compression).await;
480
481            // Verify all keys are still present
482            let keys = vec![
483                (1u64, test_key("key1"), 1),
484                (2u64, test_key("key2"), 2),
485                (3u64, test_key("key3"), 3),
486            ];
487
488            for (index, key, expected_data) in &keys {
489                let retrieved = archive
490                    .get(Identifier::Index(*index))
491                    .await
492                    .expect("Failed to get data")
493                    .expect("Data not found");
494                assert_eq!(retrieved, *expected_data);
495
496                let retrieved = archive
497                    .get(Identifier::Key(key))
498                    .await
499                    .expect("Failed to get data")
500                    .expect("Data not found");
501                assert_eq!(retrieved, *expected_data);
502            }
503        }
504    }
505
506    #[test_traced]
507    fn test_persistence_prunable_no_compression() {
508        let executor = deterministic::Runner::default();
509        executor.start(|context| async move {
510            test_persistence_impl(context, create_prunable, None).await;
511        });
512    }
513
514    #[test_traced]
515    fn test_persistence_prunable_compression() {
516        let executor = deterministic::Runner::default();
517        executor.start(|context| async move {
518            test_persistence_impl(context, create_prunable, Some(3)).await;
519        });
520    }
521
522    #[test_traced]
523    fn test_persistence_immutable_no_compression() {
524        let executor = deterministic::Runner::default();
525        executor.start(|context| async move {
526            test_persistence_impl(context, create_immutable, None).await;
527        });
528    }
529
530    #[test_traced]
531    fn test_persistence_immutable_compression() {
532        let executor = deterministic::Runner::default();
533        executor.start(|context| async move {
534            test_persistence_impl(context, create_immutable, Some(3)).await;
535        });
536    }
537
538    async fn test_ranges_impl<A, F, Fut>(mut context: Context, creator: F, compression: Option<u8>)
539    where
540        A: Archive<Key = FixedBytes<64>, Value = i32>,
541        F: Fn(Context, Option<u8>) -> Fut,
542        Fut: Future<Output = A>,
543    {
544        let mut keys = BTreeMap::new();
545        {
546            let mut archive = creator(context.with_label("first"), compression).await;
547
548            // Insert 100 keys with gaps
549            let mut last_index = 0u64;
550            while keys.len() < 100 {
551                let gap: u64 = context.gen_range(1..=10);
552                let index = last_index + gap;
553                last_index = index;
554
555                let mut key_bytes = [0u8; 64];
556                context.fill(&mut key_bytes);
557                let key = FixedBytes::<64>::decode(key_bytes.as_ref()).unwrap();
558                let data: i32 = context.gen();
559
560                if keys.contains_key(&index) {
561                    continue;
562                }
563                keys.insert(index, (key.clone(), data));
564
565                archive
566                    .put(index, key, data)
567                    .await
568                    .expect("Failed to put data");
569            }
570
571            archive.sync().await.expect("Failed to sync archive");
572        }
573
574        {
575            let archive = creator(context.with_label("second"), compression).await;
576            let sorted_indices: Vec<u64> = keys.keys().cloned().collect();
577
578            // Check gap before the first element
579            let (current_end, start_next) = archive.next_gap(0);
580            assert!(current_end.is_none());
581            assert_eq!(start_next, Some(sorted_indices[0]));
582
583            // Check gaps between elements
584            let mut i = 0;
585            while i < sorted_indices.len() {
586                let current_index = sorted_indices[i];
587
588                // Find the end of the current contiguous block
589                let mut j = i;
590                while j + 1 < sorted_indices.len() && sorted_indices[j + 1] == sorted_indices[j] + 1
591                {
592                    j += 1;
593                }
594                let block_end_index = sorted_indices[j];
595                let next_actual_index = if j + 1 < sorted_indices.len() {
596                    Some(sorted_indices[j + 1])
597                } else {
598                    None
599                };
600
601                let (current_end, start_next) = archive.next_gap(current_index);
602                assert_eq!(current_end, Some(block_end_index));
603                assert_eq!(start_next, next_actual_index);
604
605                // If there's a gap, check an index within the gap
606                if let Some(next_index) = next_actual_index {
607                    if next_index > block_end_index + 1 {
608                        let in_gap_index = block_end_index + 1;
609                        let (current_end, start_next) = archive.next_gap(in_gap_index);
610                        assert!(current_end.is_none());
611                        assert_eq!(start_next, Some(next_index));
612                    }
613                }
614                i = j + 1;
615            }
616
617            // Check the last element
618            let last_index = *sorted_indices.last().unwrap();
619            let (current_end, start_next) = archive.next_gap(last_index);
620            assert!(current_end.is_some());
621            assert!(start_next.is_none());
622        }
623    }
624
625    #[test_traced]
626    fn test_ranges_prunable_no_compression() {
627        let executor = deterministic::Runner::default();
628        executor.start(|context| async move {
629            test_ranges_impl(context, create_prunable, None).await;
630        });
631    }
632
633    #[test_traced]
634    fn test_ranges_prunable_compression() {
635        let executor = deterministic::Runner::default();
636        executor.start(|context| async move {
637            test_ranges_impl(context, create_prunable, Some(3)).await;
638        });
639    }
640
641    #[test_traced]
642    fn test_ranges_immutable_no_compression() {
643        let executor = deterministic::Runner::default();
644        executor.start(|context| async move {
645            test_ranges_impl(context, create_immutable, None).await;
646        });
647    }
648
649    #[test_traced]
650    fn test_ranges_immutable_compression() {
651        let executor = deterministic::Runner::default();
652        executor.start(|context| async move {
653            test_ranges_impl(context, create_immutable, Some(3)).await;
654        });
655    }
656
657    async fn test_many_keys_impl<A, F, Fut>(
658        mut context: Context,
659        creator: F,
660        compression: Option<u8>,
661        num: usize,
662    ) where
663        A: Archive<Key = FixedBytes<64>, Value = i32>,
664        F: Fn(Context, Option<u8>) -> Fut,
665        Fut: Future<Output = A>,
666    {
667        // Insert many keys
668        let mut keys = BTreeMap::new();
669        {
670            let mut archive = creator(context.with_label("first"), compression).await;
671            while keys.len() < num {
672                let index = keys.len() as u64;
673                let mut key = [0u8; 64];
674                context.fill(&mut key);
675                let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
676                let data: i32 = context.gen();
677
678                archive
679                    .put(index, key.clone(), data)
680                    .await
681                    .expect("Failed to put data");
682                keys.insert(key, (index, data));
683
684                // Randomly sync the archive
685                if context.gen_bool(0.1) {
686                    archive.sync().await.expect("Failed to sync archive");
687                }
688            }
689            archive.sync().await.expect("Failed to sync archive");
690
691            // Ensure all keys can be retrieved
692            for (key, (index, data)) in &keys {
693                let retrieved = archive
694                    .get(Identifier::Index(*index))
695                    .await
696                    .expect("Failed to get data")
697                    .expect("Data not found");
698                assert_eq!(&retrieved, data);
699                let retrieved = archive
700                    .get(Identifier::Key(key))
701                    .await
702                    .expect("Failed to get data")
703                    .expect("Data not found");
704                assert_eq!(&retrieved, data);
705            }
706        }
707
708        // Reinitialize and verify
709        {
710            let archive = creator(context.with_label("second"), compression).await;
711
712            // Ensure all keys can be retrieved
713            for (key, (index, data)) in &keys {
714                let retrieved = archive
715                    .get(Identifier::Index(*index))
716                    .await
717                    .expect("Failed to get data")
718                    .expect("Data not found");
719                assert_eq!(&retrieved, data);
720                let retrieved = archive
721                    .get(Identifier::Key(key))
722                    .await
723                    .expect("Failed to get data")
724                    .expect("Data not found");
725                assert_eq!(&retrieved, data);
726            }
727        }
728    }
729
730    fn test_many_keys_determinism<F, Fut, A>(creator: F, compression: Option<u8>, num: usize)
731    where
732        A: Archive<Key = FixedBytes<64>, Value = i32>,
733        F: Fn(Context, Option<u8>) -> Fut + Copy + Send + 'static,
734        Fut: Future<Output = A> + Send,
735    {
736        let executor = deterministic::Runner::default();
737        let state1 = executor.start(|context| async move {
738            test_many_keys_impl(context.clone(), creator, compression, num).await;
739            context.auditor().state()
740        });
741        let executor = deterministic::Runner::default();
742        let state2 = executor.start(|context| async move {
743            test_many_keys_impl(context.clone(), creator, compression, num).await;
744            context.auditor().state()
745        });
746        assert_eq!(state1, state2);
747    }
748
749    #[test_traced]
750    fn test_many_keys_prunable_no_compression() {
751        test_many_keys_determinism(create_prunable, None, 1_000);
752    }
753
754    #[test_traced]
755    fn test_many_keys_prunable_compression() {
756        test_many_keys_determinism(create_prunable, Some(3), 1_000);
757    }
758
759    #[test_traced]
760    fn test_many_keys_immutable_no_compression() {
761        test_many_keys_determinism(create_immutable, None, 1_000);
762    }
763
764    #[test_traced]
765    fn test_many_keys_immutable_compression() {
766        test_many_keys_determinism(create_immutable, Some(3), 1_000);
767    }
768
769    #[test_group("slow")]
770    #[test_traced]
771    fn test_many_keys_prunable_large() {
772        test_many_keys_determinism(create_prunable, None, 50_000);
773    }
774
775    #[test_group("slow")]
776    #[test_traced]
777    fn test_many_keys_immutable_large() {
778        test_many_keys_determinism(create_immutable, None, 50_000);
779    }
780
781    async fn test_put_multi_and_get_impl(
782        context: Context,
783        mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
784    ) {
785        // Put three items at the same index with different keys
786        let index = 5u64;
787        let key_a = test_key("aaa");
788        let key_b = test_key("bbb");
789        let key_c = test_key("ccc");
790
791        archive
792            .put_multi(index, key_a.clone(), 10)
793            .await
794            .expect("put_multi a");
795        archive
796            .put_multi(index, key_b.clone(), 20)
797            .await
798            .expect("put_multi b");
799        archive
800            .put_multi(index, key_c.clone(), 30)
801            .await
802            .expect("put_multi c");
803
804        // Retrieve each by key
805        assert_eq!(
806            archive.get(Identifier::Key(&key_a)).await.unwrap(),
807            Some(10)
808        );
809        assert_eq!(
810            archive.get(Identifier::Key(&key_b)).await.unwrap(),
811            Some(20)
812        );
813        assert_eq!(
814            archive.get(Identifier::Key(&key_c)).await.unwrap(),
815            Some(30)
816        );
817
818        // Missing key returns None
819        let missing = test_key("zzz");
820        assert_eq!(archive.get(Identifier::Key(&missing)).await.unwrap(), None);
821
822        // items_tracked reflects unique indices, not total items
823        let buffer = context.encode();
824        assert!(buffer.contains("items_tracked 1"));
825    }
826
827    #[test_traced]
828    fn test_put_multi_and_get_prunable() {
829        let executor = deterministic::Runner::default();
830        executor.start(|context| async move {
831            let archive = create_prunable(context.clone(), None).await;
832            test_put_multi_and_get_impl(context, archive).await;
833        });
834    }
835
836    async fn test_put_multi_duplicate_key_impl(
837        context: Context,
838        mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
839    ) {
840        let key = test_key("dup");
841        archive.put_multi(5, key.clone(), 10).await.unwrap();
842        archive.put_multi(7, key.clone(), 20).await.unwrap();
843
844        // Duplicate key is allowed across indices.
845        assert_eq!(archive.get(Identifier::Index(5)).await.unwrap(), Some(10));
846        assert_eq!(archive.get(Identifier::Index(7)).await.unwrap(), Some(20));
847        assert_eq!(archive.get_all(5).await.unwrap(), Some(vec![10]));
848        assert_eq!(archive.get_all(7).await.unwrap(), Some(vec![20]));
849
850        // Like Archive::put, duplicate keys may return any associated value.
851        assert!(matches!(
852            archive.get(Identifier::Key(&key)).await.unwrap(),
853            Some(10 | 20)
854        ));
855
856        let buffer = context.encode();
857        assert!(buffer.contains("items_tracked 2"));
858    }
859
860    #[test_traced]
861    fn test_put_multi_duplicate_key_prunable() {
862        let executor = deterministic::Runner::default();
863        executor.start(|context| async move {
864            let archive = create_prunable(context.clone(), None).await;
865            test_put_multi_duplicate_key_impl(context, archive).await;
866        });
867    }
868
869    async fn test_get_all_impl(mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>) {
870        // Three items at the same index
871        archive.put_multi(5, test_key("aaa"), 10).await.unwrap();
872        archive.put_multi(5, test_key("bbb"), 20).await.unwrap();
873        archive.put_multi(5, test_key("ccc"), 30).await.unwrap();
874
875        // One item at a different index
876        archive.put_multi(7, test_key("ddd"), 40).await.unwrap();
877
878        // get_all returns all values at the index in insertion order
879        let all = archive.get_all(5).await.unwrap();
880        assert_eq!(all, Some(vec![10, 20, 30]));
881
882        // Single-item index returns one element
883        let all = archive.get_all(7).await.unwrap();
884        assert_eq!(all, Some(vec![40]));
885
886        // Missing index returns None
887        let all = archive.get_all(99).await.unwrap();
888        assert_eq!(all, None);
889
890        // Archive::get(Index) still returns only the first
891        assert_eq!(archive.get(Identifier::Index(5)).await.unwrap(), Some(10));
892    }
893
894    #[test_traced]
895    fn test_get_all_prunable() {
896        let executor = deterministic::Runner::default();
897        executor.start(|context| async move {
898            let archive = create_prunable(context, None).await;
899            test_get_all_impl(archive).await;
900        });
901    }
902
903    async fn test_put_multi_preserves_archive_put_semantics_impl(
904        mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
905    ) {
906        // put_multi two items at the same index
907        archive
908            .put_multi(1, test_key("aaa"), 10)
909            .await
910            .expect("put_multi");
911        archive
912            .put_multi(1, test_key("bbb"), 20)
913            .await
914            .expect("put_multi");
915
916        // Archive::put is a no-op when index already exists
917        archive
918            .put(1, test_key("ccc"), 30)
919            .await
920            .expect("Archive::put should no-op");
921
922        // Only two items exist (Archive::put did not add a third)
923        assert_eq!(
924            archive
925                .get(Identifier::Key(&test_key("aaa")))
926                .await
927                .unwrap(),
928            Some(10)
929        );
930        assert_eq!(
931            archive
932                .get(Identifier::Key(&test_key("bbb")))
933                .await
934                .unwrap(),
935            Some(20)
936        );
937        assert_eq!(
938            archive
939                .get(Identifier::Key(&test_key("ccc")))
940                .await
941                .unwrap(),
942            None
943        );
944
945        // Archive::get(Index) returns the first item inserted
946        let first = archive
947            .get(Identifier::Index(1))
948            .await
949            .unwrap()
950            .expect("should find first");
951        assert_eq!(first, 10);
952    }
953
954    #[test_traced]
955    fn test_put_multi_preserves_archive_put_semantics_prunable() {
956        let executor = deterministic::Runner::default();
957        executor.start(|context| async move {
958            let archive = create_prunable(context, None).await;
959            test_put_multi_preserves_archive_put_semantics_impl(archive).await;
960        });
961    }
962
963    async fn test_put_multi_restart_impl<A, F, Fut>(
964        context: Context,
965        creator: F,
966        compression: Option<u8>,
967    ) where
968        A: MultiArchive<Key = FixedBytes<64>, Value = i32>,
969        F: Fn(Context, Option<u8>) -> Fut,
970        Fut: Future<Output = A>,
971    {
972        // Write multi-items, sync, and drop
973        {
974            let mut archive = creator(context.with_label("init1"), compression).await;
975            archive.put_multi(5, test_key("aaa"), 10).await.unwrap();
976            archive.put_multi(5, test_key("bbb"), 20).await.unwrap();
977            archive.put_multi(7, test_key("ccc"), 30).await.unwrap();
978            archive.sync().await.unwrap();
979        }
980
981        // Reinitialize and verify
982        let archive = creator(context.with_label("init2"), compression).await;
983
984        assert_eq!(
985            archive
986                .get(Identifier::Key(&test_key("aaa")))
987                .await
988                .unwrap(),
989            Some(10)
990        );
991        assert_eq!(
992            archive
993                .get(Identifier::Key(&test_key("bbb")))
994                .await
995                .unwrap(),
996            Some(20)
997        );
998        assert_eq!(
999            archive
1000                .get(Identifier::Key(&test_key("ccc")))
1001                .await
1002                .unwrap(),
1003            Some(30)
1004        );
1005
1006        // items_tracked reflects two unique indices after restart
1007        let buffer = context.encode();
1008        assert!(buffer.contains("items_tracked 2"));
1009    }
1010
1011    #[test_traced]
1012    fn test_put_multi_restart_prunable() {
1013        let executor = deterministic::Runner::default();
1014        executor.start(|context| async move {
1015            test_put_multi_restart_impl(context, create_prunable, None).await;
1016        });
1017    }
1018
1019    async fn test_put_multi_mixed_indices_impl(
1020        context: Context,
1021        mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
1022    ) {
1023        // Mix Archive::put (single-item) and MultiArchive::put_multi
1024        archive.put(1, test_key("single"), 100).await.unwrap();
1025        archive
1026            .put_multi(2, test_key("multi-a"), 200)
1027            .await
1028            .unwrap();
1029        archive
1030            .put_multi(2, test_key("multi-b"), 201)
1031            .await
1032            .unwrap();
1033        archive
1034            .put_multi(3, test_key("multi-c"), 300)
1035            .await
1036            .unwrap();
1037
1038        // All retrievable by key
1039        assert_eq!(
1040            archive
1041                .get(Identifier::Key(&test_key("single")))
1042                .await
1043                .unwrap(),
1044            Some(100)
1045        );
1046        assert_eq!(
1047            archive
1048                .get(Identifier::Key(&test_key("multi-a")))
1049                .await
1050                .unwrap(),
1051            Some(200)
1052        );
1053        assert_eq!(
1054            archive
1055                .get(Identifier::Key(&test_key("multi-b")))
1056                .await
1057                .unwrap(),
1058            Some(201)
1059        );
1060        assert_eq!(
1061            archive
1062                .get(Identifier::Key(&test_key("multi-c")))
1063                .await
1064                .unwrap(),
1065            Some(300)
1066        );
1067
1068        // Archive::get(Index) returns first item at that index
1069        assert_eq!(archive.get(Identifier::Index(2)).await.unwrap(), Some(200));
1070
1071        // Gap tracking works across mixed usage
1072        let (end, next) = archive.next_gap(1);
1073        assert_eq!(end, Some(3));
1074        assert!(next.is_none());
1075
1076        let buffer = context.encode();
1077        assert!(buffer.contains("items_tracked 3"));
1078    }
1079
1080    #[test_traced]
1081    fn test_put_multi_mixed_indices_prunable() {
1082        let executor = deterministic::Runner::default();
1083        executor.start(|context| async move {
1084            let archive = create_prunable(context.clone(), None).await;
1085            test_put_multi_mixed_indices_impl(context, archive).await;
1086        });
1087    }
1088
1089    fn assert_send<T: Send>(_: T) {}
1090
1091    #[allow(dead_code)]
1092    fn assert_archive_futures_are_send<T: super::Archive>(
1093        archive: &mut T,
1094        key: T::Key,
1095        value: T::Value,
1096    ) where
1097        T::Key: Clone,
1098        T::Value: Clone,
1099    {
1100        assert_send(archive.put(1, key.clone(), value.clone()));
1101        assert_send(archive.put_sync(2, key.clone(), value));
1102        assert_send(archive.get(Identifier::Index(1)));
1103        assert_send(archive.get(Identifier::Key(&key)));
1104        assert_send(archive.has(Identifier::Index(1)));
1105        assert_send(archive.has(Identifier::Key(&key)));
1106        assert_send(archive.sync());
1107    }
1108
1109    #[allow(dead_code)]
1110    fn assert_archive_destroy_is_send<T: super::Archive>(archive: T) {
1111        assert_send(archive.destroy());
1112    }
1113
1114    #[allow(dead_code)]
1115    fn assert_multi_archive_futures_are_send<T: super::MultiArchive>(
1116        archive: &mut T,
1117        key: T::Key,
1118        value: T::Value,
1119    ) where
1120        T::Key: Clone,
1121        T::Value: Clone,
1122    {
1123        assert_archive_futures_are_send(archive, key.clone(), value.clone());
1124        assert_send(archive.get_all(1));
1125        assert_send(archive.put_multi(1, key.clone(), value.clone()));
1126        assert_send(archive.put_multi_sync(2, key, value));
1127    }
1128
1129    #[allow(dead_code)]
1130    fn assert_prunable_archive_futures_are_send(
1131        archive: &mut prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1132        key: FixedBytes<64>,
1133        value: i32,
1134    ) {
1135        assert_archive_futures_are_send(archive, key, value);
1136    }
1137
1138    #[allow(dead_code)]
1139    fn assert_prunable_multi_archive_futures_are_send(
1140        archive: &mut prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1141        key: FixedBytes<64>,
1142        value: i32,
1143    ) {
1144        assert_multi_archive_futures_are_send(archive, key, value);
1145    }
1146
1147    #[allow(dead_code)]
1148    fn assert_prunable_archive_destroy_is_send(
1149        archive: prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1150    ) {
1151        assert_archive_destroy_is_send(archive);
1152    }
1153
1154    #[allow(dead_code)]
1155    fn assert_immutable_archive_futures_are_send(
1156        archive: &mut immutable::Archive<Context, FixedBytes<64>, i32>,
1157        key: FixedBytes<64>,
1158        value: i32,
1159    ) {
1160        assert_archive_futures_are_send(archive, key, value);
1161    }
1162
1163    #[allow(dead_code)]
1164    fn assert_immutable_archive_destroy_is_send(
1165        archive: immutable::Archive<Context, FixedBytes<64>, i32>,
1166    ) {
1167        assert_archive_destroy_is_send(archive);
1168    }
1169}