Skip to main content

commonware_storage/archive/
mod.rs

1//! A write-once key-value store for ordered data.
2//!
3//! [Archive] is a key-value store designed for workloads where all data is written only once and is
4//! uniquely associated with both an `index` and a `key`.
5
6use commonware_codec::Codec;
7use commonware_utils::Array;
8use std::future::Future;
9use thiserror::Error;
10
11pub mod immutable;
12pub mod prunable;
13
14#[cfg(all(test, feature = "arbitrary"))]
15mod conformance;
16
17/// Subject of a `get` or `has` operation.
18pub enum Identifier<'a, K: Array> {
19    Index(u64),
20    Key(&'a K),
21}
22
23/// Errors that can occur when interacting with the archive.
24#[derive(Debug, Error)]
25pub enum Error {
26    #[error("journal error: {0}")]
27    Journal(#[from] crate::journal::Error),
28    #[error("ordinal error: {0}")]
29    Ordinal(#[from] crate::ordinal::Error),
30    #[error("metadata error: {0}")]
31    Metadata(#[from] crate::metadata::Error),
32    #[error("freezer error: {0}")]
33    Freezer(#[from] crate::freezer::Error),
34    #[error("record corrupted")]
35    RecordCorrupted,
36    #[error("already pruned to: {0}")]
37    AlreadyPrunedTo(u64),
38    #[error("record too large")]
39    RecordTooLarge,
40}
41
42/// A write-once key-value store where each key is associated with a unique index.
43pub trait Archive: Send {
44    /// The type of the key.
45    type Key: Array;
46
47    /// The type of the value.
48    type Value: Codec + Send;
49
50    /// Store an item in [Archive]. Both indices and keys are assumed to both be globally unique.
51    ///
52    /// If the index already exists, put does nothing and returns. If the same key is stored multiple times
53    /// at different indices (not recommended), any value associated with the key may be returned.
54    fn put(
55        &mut self,
56        index: u64,
57        key: Self::Key,
58        value: Self::Value,
59    ) -> impl Future<Output = Result<(), Error>> + Send;
60
61    /// Perform a [Archive::put] and [Archive::sync] in a single operation.
62    fn put_sync(
63        &mut self,
64        index: u64,
65        key: Self::Key,
66        value: Self::Value,
67    ) -> impl Future<Output = Result<(), Error>> + Send {
68        async move {
69            self.put(index, key, value).await?;
70            self.sync().await
71        }
72    }
73
74    /// Retrieve an item from [Archive].
75    ///
76    /// Note that if the [Archive] is a [MultiArchive], there may be multiple values associated with the
77    /// same [Identifier::Index]. If there are multiple values, the first stored will be returned. Use
78    /// [MultiArchive::get_all] to retrieve all values at an index.
79    fn get<'a>(
80        &'a self,
81        identifier: Identifier<'a, Self::Key>,
82    ) -> impl Future<Output = Result<Option<Self::Value>, Error>> + Send + use<'a, Self>;
83
84    /// Check if an item exists in [Archive].
85    fn has<'a>(
86        &'a self,
87        identifier: Identifier<'a, Self::Key>,
88    ) -> impl Future<Output = Result<bool, Error>> + Send + use<'a, Self>;
89
90    /// Retrieve the end of the current range including `index` (inclusive) and
91    /// the start of the next range after `index` (if it exists).
92    ///
93    /// This is useful for driving backfill operations over the archive.
94    fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>);
95
96    /// Returns up to `max` missing items starting from `start`.
97    ///
98    /// This method iterates through gaps between existing ranges, collecting missing indices
99    /// until either `max` items are found or there are no more gaps to fill.
100    fn missing_items(&self, index: u64, max: usize) -> Vec<u64>;
101
102    /// Retrieve an iterator over all populated ranges (inclusive) within the [Archive].
103    fn ranges(&self) -> impl Iterator<Item = (u64, u64)>;
104
105    /// Retrieve the first index in the [Archive].
106    fn first_index(&self) -> Option<u64>;
107
108    /// Retrieve the last index in the [Archive].
109    fn last_index(&self) -> Option<u64>;
110
111    /// Sync all pending writes.
112    fn sync(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
113
114    /// Remove all persistent data created by this [Archive].
115    fn destroy(self) -> impl Future<Output = Result<(), Error>> + Send;
116}
117
118/// Extension of [Archive] that supports multiple items at the same index.
119///
120/// Unlike [Archive::put], which is a no-op when the index already exists,
121/// [MultiArchive::put_multi] allows storing additional `(key, value)` pairs
122/// at an existing index. As with [Archive::put], keys are assumed to be globally
123/// unique, but duplicate keys are not rejected.
124pub trait MultiArchive: Archive {
125    /// Retrieve all values stored at the given index.
126    ///
127    /// Returns `None` if the index does not exist or has been pruned.
128    fn get_all(
129        &self,
130        index: u64,
131    ) -> impl Future<Output = Result<Option<Vec<Self::Value>>, Error>> + Send + use<'_, Self>;
132
133    /// Store an item, allowing multiple items at the same index.
134    ///
135    /// Multiple items may share the same `index`. If the same key is stored at
136    /// multiple indices, any associated value may be returned when queried with
137    /// [Identifier::Key].
138    fn put_multi(
139        &mut self,
140        index: u64,
141        key: Self::Key,
142        value: Self::Value,
143    ) -> impl Future<Output = Result<(), Error>> + Send;
144
145    /// Perform a [MultiArchive::put_multi] and [Archive::sync] in a single operation.
146    fn put_multi_sync(
147        &mut self,
148        index: u64,
149        key: Self::Key,
150        value: Self::Value,
151    ) -> impl Future<Output = Result<(), Error>> + Send {
152        async move {
153            self.put_multi(index, key, value).await?;
154            self.sync().await
155        }
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162    use crate::{kv::tests::test_key, translator::TwoCap};
163    use commonware_codec::DecodeExt;
164    use commonware_macros::{test_group, test_traced};
165    use commonware_runtime::{
166        buffer::paged::CacheRef,
167        deterministic::{self, Context},
168        Metrics, Runner,
169    };
170    use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
171    use rand::Rng;
172    use std::{
173        collections::BTreeMap,
174        num::{NonZeroU16, NonZeroUsize},
175    };
176
177    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
178    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
179
180    async fn create_prunable(
181        context: Context,
182        compression: Option<u8>,
183    ) -> impl MultiArchive<Key = FixedBytes<64>, Value = i32> {
184        let cfg = prunable::Config {
185            translator: TwoCap,
186            key_partition: "test-key".into(),
187            key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
188            value_partition: "test-value".into(),
189            compression,
190            codec_config: (),
191            items_per_section: NZU64!(1024),
192            key_write_buffer: NZUsize!(1024),
193            value_write_buffer: NZUsize!(1024),
194            replay_buffer: NZUsize!(1024),
195        };
196        prunable::Archive::init(context, cfg).await.unwrap()
197    }
198
199    async fn create_immutable(
200        context: Context,
201        compression: Option<u8>,
202    ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
203        let cfg = immutable::Config {
204            metadata_partition: "test-metadata".into(),
205            freezer_table_partition: "test-table".into(),
206            freezer_table_initial_size: 64,
207            freezer_table_resize_frequency: 2,
208            freezer_table_resize_chunk_size: 32,
209            freezer_key_partition: "test-key".into(),
210            freezer_key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
211            freezer_value_partition: "test-value".into(),
212            freezer_value_target_size: 1024 * 1024,
213            freezer_value_compression: compression,
214            ordinal_partition: "test-ordinal".into(),
215            items_per_section: NZU64!(1024),
216            freezer_key_write_buffer: NZUsize!(1024 * 1024),
217            freezer_value_write_buffer: NZUsize!(1024 * 1024),
218            ordinal_write_buffer: NZUsize!(1024 * 1024),
219            replay_buffer: NZUsize!(1024 * 1024),
220            codec_config: (),
221        };
222        immutable::Archive::init(context, cfg).await.unwrap()
223    }
224
225    async fn test_put_get_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
226        let index = 1u64;
227        let key = test_key("testkey");
228        let data = 1;
229
230        // Has the key before put
231        let has = archive
232            .has(Identifier::Index(index))
233            .await
234            .expect("Failed to check key");
235        assert!(!has);
236        let has = archive
237            .has(Identifier::Key(&key))
238            .await
239            .expect("Failed to check key");
240        assert!(!has);
241
242        // Put the key-data pair
243        archive
244            .put(index, key.clone(), data)
245            .await
246            .expect("Failed to put data");
247
248        // Has the key after put
249        let has = archive
250            .has(Identifier::Index(index))
251            .await
252            .expect("Failed to check key");
253        assert!(has);
254        let has = archive
255            .has(Identifier::Key(&key))
256            .await
257            .expect("Failed to check key");
258        assert!(has);
259
260        // Get the data by key
261        let retrieved = archive
262            .get(Identifier::Key(&key))
263            .await
264            .expect("Failed to get data");
265        assert_eq!(retrieved, Some(data));
266
267        // Get the data by index
268        let retrieved = archive
269            .get(Identifier::Index(index))
270            .await
271            .expect("Failed to get data");
272        assert_eq!(retrieved, Some(data));
273
274        // Force a sync
275        archive.sync().await.expect("Failed to sync data");
276    }
277
278    #[test_traced]
279    fn test_put_get_prunable_no_compression() {
280        let executor = deterministic::Runner::default();
281        executor.start(|context| async move {
282            let archive = create_prunable(context, None).await;
283            test_put_get_impl(archive).await;
284        });
285    }
286
287    #[test_traced]
288    fn test_put_get_prunable_compression() {
289        let executor = deterministic::Runner::default();
290        executor.start(|context| async move {
291            let archive = create_prunable(context, Some(3)).await;
292            test_put_get_impl(archive).await;
293        });
294    }
295
296    #[test_traced]
297    fn test_put_get_immutable_no_compression() {
298        let executor = deterministic::Runner::default();
299        executor.start(|context| async move {
300            let archive = create_immutable(context, None).await;
301            test_put_get_impl(archive).await;
302        });
303    }
304
305    #[test_traced]
306    fn test_put_get_immutable_compression() {
307        let executor = deterministic::Runner::default();
308        executor.start(|context| async move {
309            let archive = create_immutable(context, Some(3)).await;
310            test_put_get_impl(archive).await;
311        });
312    }
313
314    async fn test_duplicate_key_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
315        let index = 1u64;
316        let key = test_key("duplicate");
317        let data1 = 1;
318        let data2 = 2;
319
320        // Put the key-data pair
321        archive
322            .put(index, key.clone(), data1)
323            .await
324            .expect("Failed to put data");
325
326        // Put the key-data pair again (should be idempotent)
327        archive
328            .put(index, key.clone(), data2)
329            .await
330            .expect("Duplicate put should not fail");
331
332        // Get the data back - should still be the first value
333        let retrieved = archive
334            .get(Identifier::Index(index))
335            .await
336            .expect("Failed to get data")
337            .expect("Data not found");
338        assert_eq!(retrieved, data1);
339
340        let retrieved = archive
341            .get(Identifier::Key(&key))
342            .await
343            .expect("Failed to get data")
344            .expect("Data not found");
345        assert_eq!(retrieved, data1);
346    }
347
348    #[test_traced]
349    fn test_duplicate_key_prunable_no_compression() {
350        let executor = deterministic::Runner::default();
351        executor.start(|context| async move {
352            let archive = create_prunable(context, None).await;
353            test_duplicate_key_impl(archive).await;
354        });
355    }
356
357    #[test_traced]
358    fn test_duplicate_key_prunable_compression() {
359        let executor = deterministic::Runner::default();
360        executor.start(|context| async move {
361            let archive = create_prunable(context, Some(3)).await;
362            test_duplicate_key_impl(archive).await;
363        });
364    }
365
366    #[test_traced]
367    fn test_duplicate_key_immutable_no_compression() {
368        let executor = deterministic::Runner::default();
369        executor.start(|context| async move {
370            let archive = create_immutable(context, None).await;
371            test_duplicate_key_impl(archive).await;
372        });
373    }
374
375    #[test_traced]
376    fn test_duplicate_key_immutable_compression() {
377        let executor = deterministic::Runner::default();
378        executor.start(|context| async move {
379            let archive = create_immutable(context, Some(3)).await;
380            test_duplicate_key_impl(archive).await;
381        });
382    }
383
384    async fn test_get_nonexistent_impl(archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
385        // Attempt to get an index that doesn't exist
386        let index = 1u64;
387        let retrieved: Option<i32> = archive
388            .get(Identifier::Index(index))
389            .await
390            .expect("Failed to get data");
391        assert!(retrieved.is_none());
392
393        // Attempt to get a key that doesn't exist
394        let key = test_key("nonexistent");
395        let retrieved = archive
396            .get(Identifier::Key(&key))
397            .await
398            .expect("Failed to get data");
399        assert!(retrieved.is_none());
400    }
401
402    #[test_traced]
403    fn test_get_nonexistent_prunable_no_compression() {
404        let executor = deterministic::Runner::default();
405        executor.start(|context| async move {
406            let archive = create_prunable(context, None).await;
407            test_get_nonexistent_impl(archive).await;
408        });
409    }
410
411    #[test_traced]
412    fn test_get_nonexistent_prunable_compression() {
413        let executor = deterministic::Runner::default();
414        executor.start(|context| async move {
415            let archive = create_prunable(context, Some(3)).await;
416            test_get_nonexistent_impl(archive).await;
417        });
418    }
419
420    #[test_traced]
421    fn test_get_nonexistent_immutable_no_compression() {
422        let executor = deterministic::Runner::default();
423        executor.start(|context| async move {
424            let archive = create_immutable(context, None).await;
425            test_get_nonexistent_impl(archive).await;
426        });
427    }
428
429    #[test_traced]
430    fn test_get_nonexistent_immutable_compression() {
431        let executor = deterministic::Runner::default();
432        executor.start(|context| async move {
433            let archive = create_immutable(context, Some(3)).await;
434            test_get_nonexistent_impl(archive).await;
435        });
436    }
437
438    async fn test_persistence_impl<A, F, Fut>(context: Context, creator: F, compression: Option<u8>)
439    where
440        A: Archive<Key = FixedBytes<64>, Value = i32>,
441        F: Fn(Context, Option<u8>) -> Fut,
442        Fut: Future<Output = A>,
443    {
444        // Create and populate archive
445        {
446            let mut archive = creator(context.with_label("first"), compression).await;
447
448            // Insert multiple keys
449            let keys = vec![
450                (1u64, test_key("key1"), 1),
451                (2u64, test_key("key2"), 2),
452                (3u64, test_key("key3"), 3),
453            ];
454
455            for (index, key, data) in &keys {
456                archive
457                    .put(*index, key.clone(), *data)
458                    .await
459                    .expect("Failed to put data");
460            }
461
462            // Sync and drop the archive
463            archive.sync().await.expect("Failed to sync archive");
464        }
465
466        // Reopen and verify data
467        {
468            let archive = creator(context.with_label("second"), compression).await;
469
470            // Verify all keys are still present
471            let keys = vec![
472                (1u64, test_key("key1"), 1),
473                (2u64, test_key("key2"), 2),
474                (3u64, test_key("key3"), 3),
475            ];
476
477            for (index, key, expected_data) in &keys {
478                let retrieved = archive
479                    .get(Identifier::Index(*index))
480                    .await
481                    .expect("Failed to get data")
482                    .expect("Data not found");
483                assert_eq!(retrieved, *expected_data);
484
485                let retrieved = archive
486                    .get(Identifier::Key(key))
487                    .await
488                    .expect("Failed to get data")
489                    .expect("Data not found");
490                assert_eq!(retrieved, *expected_data);
491            }
492        }
493    }
494
495    #[test_traced]
496    fn test_persistence_prunable_no_compression() {
497        let executor = deterministic::Runner::default();
498        executor.start(|context| async move {
499            test_persistence_impl(context, create_prunable, None).await;
500        });
501    }
502
503    #[test_traced]
504    fn test_persistence_prunable_compression() {
505        let executor = deterministic::Runner::default();
506        executor.start(|context| async move {
507            test_persistence_impl(context, create_prunable, Some(3)).await;
508        });
509    }
510
511    #[test_traced]
512    fn test_persistence_immutable_no_compression() {
513        let executor = deterministic::Runner::default();
514        executor.start(|context| async move {
515            test_persistence_impl(context, create_immutable, None).await;
516        });
517    }
518
519    #[test_traced]
520    fn test_persistence_immutable_compression() {
521        let executor = deterministic::Runner::default();
522        executor.start(|context| async move {
523            test_persistence_impl(context, create_immutable, Some(3)).await;
524        });
525    }
526
527    async fn test_ranges_impl<A, F, Fut>(mut context: Context, creator: F, compression: Option<u8>)
528    where
529        A: Archive<Key = FixedBytes<64>, Value = i32>,
530        F: Fn(Context, Option<u8>) -> Fut,
531        Fut: Future<Output = A>,
532    {
533        let mut keys = BTreeMap::new();
534        {
535            let mut archive = creator(context.with_label("first"), compression).await;
536
537            // Insert 100 keys with gaps
538            let mut last_index = 0u64;
539            while keys.len() < 100 {
540                let gap: u64 = context.gen_range(1..=10);
541                let index = last_index + gap;
542                last_index = index;
543
544                let mut key_bytes = [0u8; 64];
545                context.fill(&mut key_bytes);
546                let key = FixedBytes::<64>::decode(key_bytes.as_ref()).unwrap();
547                let data: i32 = context.gen();
548
549                if keys.contains_key(&index) {
550                    continue;
551                }
552                keys.insert(index, (key.clone(), data));
553
554                archive
555                    .put(index, key, data)
556                    .await
557                    .expect("Failed to put data");
558            }
559
560            archive.sync().await.expect("Failed to sync archive");
561        }
562
563        {
564            let archive = creator(context.with_label("second"), compression).await;
565            let sorted_indices: Vec<u64> = keys.keys().cloned().collect();
566
567            // Check gap before the first element
568            let (current_end, start_next) = archive.next_gap(0);
569            assert!(current_end.is_none());
570            assert_eq!(start_next, Some(sorted_indices[0]));
571
572            // Check gaps between elements
573            let mut i = 0;
574            while i < sorted_indices.len() {
575                let current_index = sorted_indices[i];
576
577                // Find the end of the current contiguous block
578                let mut j = i;
579                while j + 1 < sorted_indices.len() && sorted_indices[j + 1] == sorted_indices[j] + 1
580                {
581                    j += 1;
582                }
583                let block_end_index = sorted_indices[j];
584                let next_actual_index = if j + 1 < sorted_indices.len() {
585                    Some(sorted_indices[j + 1])
586                } else {
587                    None
588                };
589
590                let (current_end, start_next) = archive.next_gap(current_index);
591                assert_eq!(current_end, Some(block_end_index));
592                assert_eq!(start_next, next_actual_index);
593
594                // If there's a gap, check an index within the gap
595                if let Some(next_index) = next_actual_index {
596                    if next_index > block_end_index + 1 {
597                        let in_gap_index = block_end_index + 1;
598                        let (current_end, start_next) = archive.next_gap(in_gap_index);
599                        assert!(current_end.is_none());
600                        assert_eq!(start_next, Some(next_index));
601                    }
602                }
603                i = j + 1;
604            }
605
606            // Check the last element
607            let last_index = *sorted_indices.last().unwrap();
608            let (current_end, start_next) = archive.next_gap(last_index);
609            assert!(current_end.is_some());
610            assert!(start_next.is_none());
611        }
612    }
613
614    #[test_traced]
615    fn test_ranges_prunable_no_compression() {
616        let executor = deterministic::Runner::default();
617        executor.start(|context| async move {
618            test_ranges_impl(context, create_prunable, None).await;
619        });
620    }
621
622    #[test_traced]
623    fn test_ranges_prunable_compression() {
624        let executor = deterministic::Runner::default();
625        executor.start(|context| async move {
626            test_ranges_impl(context, create_prunable, Some(3)).await;
627        });
628    }
629
630    #[test_traced]
631    fn test_ranges_immutable_no_compression() {
632        let executor = deterministic::Runner::default();
633        executor.start(|context| async move {
634            test_ranges_impl(context, create_immutable, None).await;
635        });
636    }
637
638    #[test_traced]
639    fn test_ranges_immutable_compression() {
640        let executor = deterministic::Runner::default();
641        executor.start(|context| async move {
642            test_ranges_impl(context, create_immutable, Some(3)).await;
643        });
644    }
645
646    async fn test_many_keys_impl<A, F, Fut>(
647        mut context: Context,
648        creator: F,
649        compression: Option<u8>,
650        num: usize,
651    ) where
652        A: Archive<Key = FixedBytes<64>, Value = i32>,
653        F: Fn(Context, Option<u8>) -> Fut,
654        Fut: Future<Output = A>,
655    {
656        // Insert many keys
657        let mut keys = BTreeMap::new();
658        {
659            let mut archive = creator(context.with_label("first"), compression).await;
660            while keys.len() < num {
661                let index = keys.len() as u64;
662                let mut key = [0u8; 64];
663                context.fill(&mut key);
664                let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
665                let data: i32 = context.gen();
666
667                archive
668                    .put(index, key.clone(), data)
669                    .await
670                    .expect("Failed to put data");
671                keys.insert(key, (index, data));
672
673                // Randomly sync the archive
674                if context.gen_bool(0.1) {
675                    archive.sync().await.expect("Failed to sync archive");
676                }
677            }
678            archive.sync().await.expect("Failed to sync archive");
679
680            // Ensure all keys can be retrieved
681            for (key, (index, data)) in &keys {
682                let retrieved = archive
683                    .get(Identifier::Index(*index))
684                    .await
685                    .expect("Failed to get data")
686                    .expect("Data not found");
687                assert_eq!(&retrieved, data);
688                let retrieved = archive
689                    .get(Identifier::Key(key))
690                    .await
691                    .expect("Failed to get data")
692                    .expect("Data not found");
693                assert_eq!(&retrieved, data);
694            }
695        }
696
697        // Reinitialize and verify
698        {
699            let archive = creator(context.with_label("second"), compression).await;
700
701            // Ensure all keys can be retrieved
702            for (key, (index, data)) in &keys {
703                let retrieved = archive
704                    .get(Identifier::Index(*index))
705                    .await
706                    .expect("Failed to get data")
707                    .expect("Data not found");
708                assert_eq!(&retrieved, data);
709                let retrieved = archive
710                    .get(Identifier::Key(key))
711                    .await
712                    .expect("Failed to get data")
713                    .expect("Data not found");
714                assert_eq!(&retrieved, data);
715            }
716        }
717    }
718
719    fn test_many_keys_determinism<F, Fut, A>(creator: F, compression: Option<u8>, num: usize)
720    where
721        A: Archive<Key = FixedBytes<64>, Value = i32>,
722        F: Fn(Context, Option<u8>) -> Fut + Copy + Send + 'static,
723        Fut: Future<Output = A> + Send,
724    {
725        let executor = deterministic::Runner::default();
726        let state1 = executor.start(|context| async move {
727            test_many_keys_impl(context.clone(), creator, compression, num).await;
728            context.auditor().state()
729        });
730        let executor = deterministic::Runner::default();
731        let state2 = executor.start(|context| async move {
732            test_many_keys_impl(context.clone(), creator, compression, num).await;
733            context.auditor().state()
734        });
735        assert_eq!(state1, state2);
736    }
737
738    #[test_traced]
739    fn test_many_keys_prunable_no_compression() {
740        test_many_keys_determinism(create_prunable, None, 1_000);
741    }
742
743    #[test_traced]
744    fn test_many_keys_prunable_compression() {
745        test_many_keys_determinism(create_prunable, Some(3), 1_000);
746    }
747
748    #[test_traced]
749    fn test_many_keys_immutable_no_compression() {
750        test_many_keys_determinism(create_immutable, None, 1_000);
751    }
752
753    #[test_traced]
754    fn test_many_keys_immutable_compression() {
755        test_many_keys_determinism(create_immutable, Some(3), 1_000);
756    }
757
758    #[test_group("slow")]
759    #[test_traced]
760    fn test_many_keys_prunable_large() {
761        test_many_keys_determinism(create_prunable, None, 50_000);
762    }
763
764    #[test_group("slow")]
765    #[test_traced]
766    fn test_many_keys_immutable_large() {
767        test_many_keys_determinism(create_immutable, None, 50_000);
768    }
769
770    async fn test_put_multi_and_get_impl(
771        context: Context,
772        mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
773    ) {
774        // Put three items at the same index with different keys
775        let index = 5u64;
776        let key_a = test_key("aaa");
777        let key_b = test_key("bbb");
778        let key_c = test_key("ccc");
779
780        archive
781            .put_multi(index, key_a.clone(), 10)
782            .await
783            .expect("put_multi a");
784        archive
785            .put_multi(index, key_b.clone(), 20)
786            .await
787            .expect("put_multi b");
788        archive
789            .put_multi(index, key_c.clone(), 30)
790            .await
791            .expect("put_multi c");
792
793        // Retrieve each by key
794        assert_eq!(
795            archive.get(Identifier::Key(&key_a)).await.unwrap(),
796            Some(10)
797        );
798        assert_eq!(
799            archive.get(Identifier::Key(&key_b)).await.unwrap(),
800            Some(20)
801        );
802        assert_eq!(
803            archive.get(Identifier::Key(&key_c)).await.unwrap(),
804            Some(30)
805        );
806
807        // Missing key returns None
808        let missing = test_key("zzz");
809        assert_eq!(archive.get(Identifier::Key(&missing)).await.unwrap(), None);
810
811        // items_tracked reflects unique indices, not total items
812        let buffer = context.encode();
813        assert!(buffer.contains("items_tracked 1"));
814    }
815
816    #[test_traced]
817    fn test_put_multi_and_get_prunable() {
818        let executor = deterministic::Runner::default();
819        executor.start(|context| async move {
820            let archive = create_prunable(context.clone(), None).await;
821            test_put_multi_and_get_impl(context, archive).await;
822        });
823    }
824
825    async fn test_put_multi_duplicate_key_impl(
826        context: Context,
827        mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
828    ) {
829        let key = test_key("dup");
830        archive.put_multi(5, key.clone(), 10).await.unwrap();
831        archive.put_multi(7, key.clone(), 20).await.unwrap();
832
833        // Duplicate key is allowed across indices.
834        assert_eq!(archive.get(Identifier::Index(5)).await.unwrap(), Some(10));
835        assert_eq!(archive.get(Identifier::Index(7)).await.unwrap(), Some(20));
836        assert_eq!(archive.get_all(5).await.unwrap(), Some(vec![10]));
837        assert_eq!(archive.get_all(7).await.unwrap(), Some(vec![20]));
838
839        // Like Archive::put, duplicate keys may return any associated value.
840        assert!(matches!(
841            archive.get(Identifier::Key(&key)).await.unwrap(),
842            Some(10 | 20)
843        ));
844
845        let buffer = context.encode();
846        assert!(buffer.contains("items_tracked 2"));
847    }
848
849    #[test_traced]
850    fn test_put_multi_duplicate_key_prunable() {
851        let executor = deterministic::Runner::default();
852        executor.start(|context| async move {
853            let archive = create_prunable(context.clone(), None).await;
854            test_put_multi_duplicate_key_impl(context, archive).await;
855        });
856    }
857
858    async fn test_get_all_impl(mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>) {
859        // Three items at the same index
860        archive.put_multi(5, test_key("aaa"), 10).await.unwrap();
861        archive.put_multi(5, test_key("bbb"), 20).await.unwrap();
862        archive.put_multi(5, test_key("ccc"), 30).await.unwrap();
863
864        // One item at a different index
865        archive.put_multi(7, test_key("ddd"), 40).await.unwrap();
866
867        // get_all returns all values at the index in insertion order
868        let all = archive.get_all(5).await.unwrap();
869        assert_eq!(all, Some(vec![10, 20, 30]));
870
871        // Single-item index returns one element
872        let all = archive.get_all(7).await.unwrap();
873        assert_eq!(all, Some(vec![40]));
874
875        // Missing index returns None
876        let all = archive.get_all(99).await.unwrap();
877        assert_eq!(all, None);
878
879        // Archive::get(Index) still returns only the first
880        assert_eq!(archive.get(Identifier::Index(5)).await.unwrap(), Some(10));
881    }
882
883    #[test_traced]
884    fn test_get_all_prunable() {
885        let executor = deterministic::Runner::default();
886        executor.start(|context| async move {
887            let archive = create_prunable(context, None).await;
888            test_get_all_impl(archive).await;
889        });
890    }
891
892    async fn test_put_multi_preserves_archive_put_semantics_impl(
893        mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
894    ) {
895        // put_multi two items at the same index
896        archive
897            .put_multi(1, test_key("aaa"), 10)
898            .await
899            .expect("put_multi");
900        archive
901            .put_multi(1, test_key("bbb"), 20)
902            .await
903            .expect("put_multi");
904
905        // Archive::put is a no-op when index already exists
906        archive
907            .put(1, test_key("ccc"), 30)
908            .await
909            .expect("Archive::put should no-op");
910
911        // Only two items exist (Archive::put did not add a third)
912        assert_eq!(
913            archive
914                .get(Identifier::Key(&test_key("aaa")))
915                .await
916                .unwrap(),
917            Some(10)
918        );
919        assert_eq!(
920            archive
921                .get(Identifier::Key(&test_key("bbb")))
922                .await
923                .unwrap(),
924            Some(20)
925        );
926        assert_eq!(
927            archive
928                .get(Identifier::Key(&test_key("ccc")))
929                .await
930                .unwrap(),
931            None
932        );
933
934        // Archive::get(Index) returns the first item inserted
935        let first = archive
936            .get(Identifier::Index(1))
937            .await
938            .unwrap()
939            .expect("should find first");
940        assert_eq!(first, 10);
941    }
942
943    #[test_traced]
944    fn test_put_multi_preserves_archive_put_semantics_prunable() {
945        let executor = deterministic::Runner::default();
946        executor.start(|context| async move {
947            let archive = create_prunable(context, None).await;
948            test_put_multi_preserves_archive_put_semantics_impl(archive).await;
949        });
950    }
951
952    async fn test_put_multi_restart_impl<A, F, Fut>(
953        context: Context,
954        creator: F,
955        compression: Option<u8>,
956    ) where
957        A: MultiArchive<Key = FixedBytes<64>, Value = i32>,
958        F: Fn(Context, Option<u8>) -> Fut,
959        Fut: Future<Output = A>,
960    {
961        // Write multi-items, sync, and drop
962        {
963            let mut archive = creator(context.with_label("init1"), compression).await;
964            archive.put_multi(5, test_key("aaa"), 10).await.unwrap();
965            archive.put_multi(5, test_key("bbb"), 20).await.unwrap();
966            archive.put_multi(7, test_key("ccc"), 30).await.unwrap();
967            archive.sync().await.unwrap();
968        }
969
970        // Reinitialize and verify
971        let archive = creator(context.with_label("init2"), compression).await;
972
973        assert_eq!(
974            archive
975                .get(Identifier::Key(&test_key("aaa")))
976                .await
977                .unwrap(),
978            Some(10)
979        );
980        assert_eq!(
981            archive
982                .get(Identifier::Key(&test_key("bbb")))
983                .await
984                .unwrap(),
985            Some(20)
986        );
987        assert_eq!(
988            archive
989                .get(Identifier::Key(&test_key("ccc")))
990                .await
991                .unwrap(),
992            Some(30)
993        );
994
995        // items_tracked reflects two unique indices after restart
996        let buffer = context.encode();
997        assert!(buffer.contains("items_tracked 2"));
998    }
999
1000    #[test_traced]
1001    fn test_put_multi_restart_prunable() {
1002        let executor = deterministic::Runner::default();
1003        executor.start(|context| async move {
1004            test_put_multi_restart_impl(context, create_prunable, None).await;
1005        });
1006    }
1007
1008    async fn test_put_multi_mixed_indices_impl(
1009        context: Context,
1010        mut archive: impl MultiArchive<Key = FixedBytes<64>, Value = i32>,
1011    ) {
1012        // Mix Archive::put (single-item) and MultiArchive::put_multi
1013        archive.put(1, test_key("single"), 100).await.unwrap();
1014        archive
1015            .put_multi(2, test_key("multi-a"), 200)
1016            .await
1017            .unwrap();
1018        archive
1019            .put_multi(2, test_key("multi-b"), 201)
1020            .await
1021            .unwrap();
1022        archive
1023            .put_multi(3, test_key("multi-c"), 300)
1024            .await
1025            .unwrap();
1026
1027        // All retrievable by key
1028        assert_eq!(
1029            archive
1030                .get(Identifier::Key(&test_key("single")))
1031                .await
1032                .unwrap(),
1033            Some(100)
1034        );
1035        assert_eq!(
1036            archive
1037                .get(Identifier::Key(&test_key("multi-a")))
1038                .await
1039                .unwrap(),
1040            Some(200)
1041        );
1042        assert_eq!(
1043            archive
1044                .get(Identifier::Key(&test_key("multi-b")))
1045                .await
1046                .unwrap(),
1047            Some(201)
1048        );
1049        assert_eq!(
1050            archive
1051                .get(Identifier::Key(&test_key("multi-c")))
1052                .await
1053                .unwrap(),
1054            Some(300)
1055        );
1056
1057        // Archive::get(Index) returns first item at that index
1058        assert_eq!(archive.get(Identifier::Index(2)).await.unwrap(), Some(200));
1059
1060        // Gap tracking works across mixed usage
1061        let (end, next) = archive.next_gap(1);
1062        assert_eq!(end, Some(3));
1063        assert!(next.is_none());
1064
1065        let buffer = context.encode();
1066        assert!(buffer.contains("items_tracked 3"));
1067    }
1068
1069    #[test_traced]
1070    fn test_put_multi_mixed_indices_prunable() {
1071        let executor = deterministic::Runner::default();
1072        executor.start(|context| async move {
1073            let archive = create_prunable(context.clone(), None).await;
1074            test_put_multi_mixed_indices_impl(context, archive).await;
1075        });
1076    }
1077
1078    fn assert_send<T: Send>(_: T) {}
1079
1080    #[allow(dead_code)]
1081    fn assert_archive_futures_are_send<T: super::Archive>(
1082        archive: &mut T,
1083        key: T::Key,
1084        value: T::Value,
1085    ) where
1086        T::Key: Clone,
1087        T::Value: Clone,
1088    {
1089        assert_send(archive.put(1, key.clone(), value.clone()));
1090        assert_send(archive.put_sync(2, key.clone(), value));
1091        assert_send(archive.get(Identifier::Index(1)));
1092        assert_send(archive.get(Identifier::Key(&key)));
1093        assert_send(archive.has(Identifier::Index(1)));
1094        assert_send(archive.has(Identifier::Key(&key)));
1095        assert_send(archive.sync());
1096    }
1097
1098    #[allow(dead_code)]
1099    fn assert_archive_destroy_is_send<T: super::Archive>(archive: T) {
1100        assert_send(archive.destroy());
1101    }
1102
1103    #[allow(dead_code)]
1104    fn assert_multi_archive_futures_are_send<T: super::MultiArchive>(
1105        archive: &mut T,
1106        key: T::Key,
1107        value: T::Value,
1108    ) where
1109        T::Key: Clone,
1110        T::Value: Clone,
1111    {
1112        assert_archive_futures_are_send(archive, key.clone(), value.clone());
1113        assert_send(archive.get_all(1));
1114        assert_send(archive.put_multi(1, key.clone(), value.clone()));
1115        assert_send(archive.put_multi_sync(2, key, value));
1116    }
1117
1118    #[allow(dead_code)]
1119    fn assert_prunable_archive_futures_are_send(
1120        archive: &mut prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1121        key: FixedBytes<64>,
1122        value: i32,
1123    ) {
1124        assert_archive_futures_are_send(archive, key, value);
1125    }
1126
1127    #[allow(dead_code)]
1128    fn assert_prunable_multi_archive_futures_are_send(
1129        archive: &mut prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1130        key: FixedBytes<64>,
1131        value: i32,
1132    ) {
1133        assert_multi_archive_futures_are_send(archive, key, value);
1134    }
1135
1136    #[allow(dead_code)]
1137    fn assert_prunable_archive_destroy_is_send(
1138        archive: prunable::Archive<TwoCap, Context, FixedBytes<64>, i32>,
1139    ) {
1140        assert_archive_destroy_is_send(archive);
1141    }
1142
1143    #[allow(dead_code)]
1144    fn assert_immutable_archive_futures_are_send(
1145        archive: &mut immutable::Archive<Context, FixedBytes<64>, i32>,
1146        key: FixedBytes<64>,
1147        value: i32,
1148    ) {
1149        assert_archive_futures_are_send(archive, key, value);
1150    }
1151
1152    #[allow(dead_code)]
1153    fn assert_immutable_archive_destroy_is_send(
1154        archive: immutable::Archive<Context, FixedBytes<64>, i32>,
1155    ) {
1156        assert_archive_destroy_is_send(archive);
1157    }
1158}