Skip to main content

commonware_storage/archive/prunable/
mod.rs

1//! A prunable key-value store for ordered data.
2//!
3//! Data is stored across two backends: [crate::journal::segmented::fixed] for fixed-size index entries and
4//! [crate::journal::segmented::glob::Glob] for values (managed by [crate::journal::segmented::oversized]).
5//! The location of written data is stored in-memory by both index and key (via [crate::index::unordered::Index])
6//! to enable efficient lookups (on average).
7//!
8//! _Notably, [Archive] does not make use of compaction nor on-disk indexes (and thus has no read
9//! nor write amplification during normal operation).
10//!
11//! # Format
12//!
13//! [Archive] uses a two-journal structure for efficient page cache usage:
14//!
15//! **Index Journal (segmented/fixed)** - Fixed-size entries for fast startup replay:
16//! ```text
17//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
18//! | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |13 |14 |15 |16 |17 |18 |19 |20 |21 |22 |23 |
19//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
20//! |          Index(u64)           |Key(Fixed Size)|        val_offset(u64)        | val_size(u32) |
21//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
22//! ```
23//!
24//! **Value Blob** - Raw values with CRC32 checksums (direct reads, no page cache):
25//! ```text
26//! +---+---+---+---+---+---+---+---+---+---+---+---+
27//! |     Compressed Data (variable)    |   CRC32   |
28//! +---+---+---+---+---+---+---+---+---+---+---+---+
29//! ```
30//!
31//! # Uniqueness
32//!
33//! [Archive] assumes all stored indexes and keys are unique. If the same key is associated with
34//! multiple `indices`, there is no guarantee which value will be returned. If the key is written to
35//! an existing `index`, [Archive] will return an error.
36//!
37//! ## Conflicts
38//!
39//! Because a translated representation of a key is only ever stored in memory, it is possible (and
40//! expected) that two keys will eventually be represented by the same translated key. To handle
41//! this case, [Archive] must check the persisted form of all conflicting keys to ensure data from
42//! the correct key is returned. To support efficient checks, [Archive] (via
43//! [crate::index::unordered::Index]) keeps a linked list of all keys with the same translated
44//! prefix:
45//!
46//! ```rust
47//! struct Record {
48//!     index: u64,
49//!
50//!     next: Option<Box<Record>>,
51//! }
52//! ```
53//!
54//! _To avoid random memory reads in the common case, the in-memory index directly stores the first
55//! item in the linked list instead of a pointer to the first item._
56//!
57//! `index` is the key to the map used to serve lookups by `index` that stores the position in the
58//! index journal (selected by `section = index / items_per_section * items_per_section` to minimize
59//! the number of open blobs):
60//!
61//! ```text
62//! // Maps index -> position in index journal
63//! indices: BTreeMap<u64, u64>
64//! ```
65//!
66//! _If the [Translator] provided by the caller does not uniformly distribute keys across the key
67//! space or uses a translated representation that means keys on average have many conflicts,
68//! performance will degrade._
69//!
70//! ## Memory Overhead
71//!
72//! [Archive] uses two maps to enable lookups by both index and key. The memory used to track each
73//! index item is `8 + 8` (where `8` is the index and `8` is the position in the index journal).
74//! The memory used to track each key item is `~translated(key).len() + 16` bytes (where `16` is the
75//! size of the `Record` struct). This means that an [Archive] employing a [Translator] that uses
76//! the first `8` bytes of a key will use `~40` bytes to index each key.
77//!
78//! ### MultiArchive Overhead
79//!
80//! [Archive] stores index positions in a dual-map layout:
81//! - `indices: BTreeMap<u64, u64>` tracks the first position for each index.
82//! - `extra_indices: BTreeMap<u64, Vec<u64>>` tracks additional positions for indices written via
83//!   [crate::archive::MultiArchive::put_multi].
84//!
85//! This means the baseline overhead above remains unchanged for the first item at an index. For
86//! indices with duplicates, the additional in-memory payload is:
87//! - one `Vec<u64>` header (`24` bytes), and
88//! - `n * 8` bytes for `n` additional positions.
89//!
90//! Equivalently, this is `24 + (n * 8)` bytes per duplicated index, excluding `BTreeMap` node
91//! overhead for `extra_indices`.
92//!
93//! # Pruning
94//!
95//! [Archive] supports pruning up to a minimum `index` using the `prune` method. After `prune` is
96//! called on a `section`, all interaction with a `section` less than the pruned `section` will
97//! return an error.
98//!
99//! ## Lazy Index Cleanup
100//!
101//! Instead of performing a full iteration of the in-memory index, storing an additional in-memory
102//! index per `section`, or replaying a `section` of the value blob,
103//! [Archive] lazily cleans up the [crate::index::unordered::Index] after pruning. When a new key is
104//! stored that overlaps (same translated value) with a pruned key, the pruned key is removed from
105//! the in-memory index.
106//!
107//! # Read Path
108//!
109//! All reads (by index or key) first read the index entry from the index journal to get the
110//! value location (offset and size), then read the value from the value blob. The index journal
111//! uses a page cache for caching, so hot entries are served from memory. Values are read directly
112//! from disk without caching to avoid polluting the page cache with large values.
113//!
114//! # Compression
115//!
116//! [Archive] supports compressing data before storing it on disk. This can be enabled by setting
117//! the `compression` field in the `Config` struct to a valid `zstd` compression level. This setting
118//! can be changed between initializations of [Archive], however, it must remain populated if any
119//! data was written with compression enabled.
120//!
121//! # Querying for Gaps
122//!
123//! [Archive] tracks gaps in the index space to enable the caller to efficiently fetch unknown keys
124//! using `next_gap`. This is a very common pattern when syncing blocks in a blockchain.
125//!
126//! # Example
127//!
128//! ```rust
129//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
130//! use commonware_cryptography::{Hasher as _, Sha256};
131//! use commonware_storage::{
132//!     translator::FourCap,
133//!     archive::{
134//!         Archive as _,
135//!         prunable::{Archive, Config},
136//!     },
137//! };
138//! use commonware_utils::{NZUsize, NZU16, NZU64};
139//!
140//! let executor = deterministic::Runner::default();
141//! executor.start(|context| async move {
142//!     // Create an archive
143//!     let cfg = Config {
144//!         translator: FourCap,
145//!         key_partition: "demo-index".into(),
146//!         key_page_cache: CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10)),
147//!         value_partition: "demo-value".into(),
148//!         compression: Some(3),
149//!         codec_config: (),
150//!         items_per_section: NZU64!(1024),
151//!         key_write_buffer: NZUsize!(1024 * 1024),
152//!         value_write_buffer: NZUsize!(1024 * 1024),
153//!         replay_buffer: NZUsize!(4096),
154//!     };
155//!     let mut archive = Archive::init(context, cfg).await.unwrap();
156//!
157//!     // Put a key
158//!     archive.put(1, Sha256::hash(b"data"), 10).await.unwrap();
159//!
160//!     // Sync the archive
161//!     archive.sync().await.unwrap();
162//! });
163//! ```
164
165use crate::translator::Translator;
166use commonware_runtime::buffer::paged::CacheRef;
167use std::num::{NonZeroU64, NonZeroUsize};
168
169mod storage;
170pub use storage::Archive;
171
172/// Configuration for [Archive] storage.
173#[derive(Clone)]
174pub struct Config<T: Translator, C> {
175    /// Logic to transform keys into their index representation.
176    ///
177    /// [Archive] assumes that all internal keys are spread uniformly across the key space.
178    /// If that is not the case, lookups may be O(n) instead of O(1).
179    pub translator: T,
180
181    /// The partition to use for the key journal (stores index+key metadata).
182    pub key_partition: String,
183
184    /// The page cache to use for the key journal.
185    pub key_page_cache: CacheRef,
186
187    /// The partition to use for the value blob (stores values).
188    pub value_partition: String,
189
190    /// The compression level to use for the value blob.
191    pub compression: Option<u8>,
192
193    /// The [commonware_codec::Codec] configuration to use for the value stored in the archive.
194    pub codec_config: C,
195
196    /// The number of items per section (the granularity of pruning).
197    pub items_per_section: NonZeroU64,
198
199    /// The amount of bytes that can be buffered for the key journal before being written to a
200    /// [commonware_runtime::Blob].
201    pub key_write_buffer: NonZeroUsize,
202
203    /// The amount of bytes that can be buffered for the value journal before being written to a
204    /// [commonware_runtime::Blob].
205    pub value_write_buffer: NonZeroUsize,
206
207    /// The buffer size to use when replaying a [commonware_runtime::Blob].
208    pub replay_buffer: NonZeroUsize,
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use crate::{
215        archive::{Archive as _, Error, Identifier, MultiArchive as _},
216        journal::Error as JournalError,
217        kv::tests::test_key,
218        translator::{FourCap, TwoCap},
219    };
220    use commonware_codec::{DecodeExt, Error as CodecError};
221    use commonware_macros::{test_group, test_traced};
222    use commonware_runtime::{deterministic, Metrics, Runner};
223    use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
224    use rand::Rng;
225    use std::{collections::BTreeMap, num::NonZeroU16};
226
227    const DEFAULT_ITEMS_PER_SECTION: u64 = 65536;
228    const DEFAULT_WRITE_BUFFER: usize = 1024;
229    const DEFAULT_REPLAY_BUFFER: usize = 4096;
230    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
231    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
232
233    #[test_traced]
234    fn test_archive_compression_then_none() {
235        // Initialize the deterministic context
236        let executor = deterministic::Runner::default();
237        executor.start(|context| async move {
238            // Initialize the archive
239            let cfg = Config {
240                translator: FourCap,
241                key_partition: "test-index".into(),
242                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
243                value_partition: "test-value".into(),
244                codec_config: (),
245                compression: Some(3),
246                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
247                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
248                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
249                items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
250            };
251            let mut archive = Archive::init(context.with_label("first"), cfg.clone())
252                .await
253                .expect("Failed to initialize archive");
254
255            // Put the key-data pair
256            let index = 1u64;
257            let key = test_key("testkey");
258            let data = 1;
259            archive
260                .put(index, key.clone(), data)
261                .await
262                .expect("Failed to put data");
263
264            // Sync and drop the archive
265            archive.sync().await.expect("Failed to sync archive");
266            drop(archive);
267
268            // Initialize the archive again without compression.
269            // Index journal replay succeeds (no compression), but value reads will fail.
270            let cfg = Config {
271                translator: FourCap,
272                key_partition: "test-index".into(),
273                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
274                value_partition: "test-value".into(),
275                codec_config: (),
276                compression: None,
277                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
278                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
279                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
280                items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
281            };
282            let archive = Archive::<_, _, FixedBytes<64>, i32>::init(
283                context.with_label("second"),
284                cfg.clone(),
285            )
286            .await
287            .unwrap();
288
289            // Getting the value should fail because compression settings mismatch.
290            // Without compression, the codec sees extra bytes after decoding the value
291            // (because the compressed data doesn't match the expected format).
292            let result: Result<Option<i32>, _> = archive.get(Identifier::Index(index)).await;
293            assert!(matches!(
294                result,
295                Err(Error::Journal(JournalError::Codec(CodecError::ExtraData(
296                    _
297                ))))
298            ));
299        });
300    }
301
302    #[test_traced]
303    fn test_archive_overlapping_key_basic() {
304        // Initialize the deterministic context
305        let executor = deterministic::Runner::default();
306        executor.start(|context| async move {
307            // Initialize the archive
308            let cfg = Config {
309                translator: FourCap,
310                key_partition: "test-index".into(),
311                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
312                value_partition: "test-value".into(),
313                codec_config: (),
314                compression: None,
315                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
316                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
317                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
318                items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
319            };
320            let mut archive = Archive::init(context.clone(), cfg.clone())
321                .await
322                .expect("Failed to initialize archive");
323
324            let index1 = 1u64;
325            let key1 = test_key("keys1");
326            let data1 = 1;
327            let index2 = 2u64;
328            let key2 = test_key("keys2");
329            let data2 = 2;
330
331            // Put the key-data pair
332            archive
333                .put(index1, key1.clone(), data1)
334                .await
335                .expect("Failed to put data");
336
337            // Put the key-data pair
338            archive
339                .put(index2, key2.clone(), data2)
340                .await
341                .expect("Failed to put data");
342
343            // Get the data back
344            let retrieved = archive
345                .get(Identifier::Key(&key1))
346                .await
347                .expect("Failed to get data")
348                .expect("Data not found");
349            assert_eq!(retrieved, data1);
350
351            // Get the data back
352            let retrieved = archive
353                .get(Identifier::Key(&key2))
354                .await
355                .expect("Failed to get data")
356                .expect("Data not found");
357            assert_eq!(retrieved, data2);
358
359            // Check metrics
360            let buffer = context.encode();
361            assert!(buffer.contains("items_tracked 2"));
362            assert!(buffer.contains("unnecessary_reads_total 1"));
363            assert!(buffer.contains("gets_total 2"));
364        });
365    }
366
367    #[test_traced]
368    fn test_archive_overlapping_key_multiple_sections() {
369        // Initialize the deterministic context
370        let executor = deterministic::Runner::default();
371        executor.start(|context| async move {
372            // Initialize the archive
373            let cfg = Config {
374                translator: FourCap,
375                key_partition: "test-index".into(),
376                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
377                value_partition: "test-value".into(),
378                codec_config: (),
379                compression: None,
380                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
381                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
382                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
383                items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
384            };
385            let mut archive = Archive::init(context.clone(), cfg.clone())
386                .await
387                .expect("Failed to initialize archive");
388
389            let index1 = 1u64;
390            let key1 = test_key("keys1");
391            let data1 = 1;
392            let index2 = 2_000_000u64;
393            let key2 = test_key("keys2");
394            let data2 = 2;
395
396            // Put the key-data pair
397            archive
398                .put(index1, key1.clone(), data1)
399                .await
400                .expect("Failed to put data");
401
402            // Put the key-data pair
403            archive
404                .put(index2, key2.clone(), data2)
405                .await
406                .expect("Failed to put data");
407
408            // Get the data back
409            let retrieved = archive
410                .get(Identifier::Key(&key1))
411                .await
412                .expect("Failed to get data")
413                .expect("Data not found");
414            assert_eq!(retrieved, data1);
415
416            // Get the data back
417            let retrieved = archive
418                .get(Identifier::Key(&key2))
419                .await
420                .expect("Failed to get data")
421                .expect("Data not found");
422            assert_eq!(retrieved, data2);
423        });
424    }
425
426    #[test_traced]
427    fn test_archive_prune_keys() {
428        // Initialize the deterministic context
429        let executor = deterministic::Runner::default();
430        executor.start(|context| async move {
431            // Initialize the archive
432            let cfg = Config {
433                translator: FourCap,
434                key_partition: "test-index".into(),
435                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
436                value_partition: "test-value".into(),
437                codec_config: (),
438                compression: None,
439                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
440                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
441                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
442                items_per_section: NZU64!(1), // no mask - each item is its own section
443            };
444            let mut archive = Archive::init(context.clone(), cfg.clone())
445                .await
446                .expect("Failed to initialize archive");
447
448            // Insert multiple keys across different sections
449            let keys = vec![
450                (1u64, test_key("key1-blah"), 1),
451                (2u64, test_key("key2-blah"), 2),
452                (3u64, test_key("key3-blah"), 3),
453                (4u64, test_key("key3-bleh"), 3),
454                (5u64, test_key("key4-blah"), 4),
455            ];
456
457            for (index, key, data) in &keys {
458                archive
459                    .put(*index, key.clone(), *data)
460                    .await
461                    .expect("Failed to put data");
462            }
463
464            // Check metrics
465            let buffer = context.encode();
466            assert!(buffer.contains("items_tracked 5"));
467
468            // Prune sections less than 3
469            archive.prune(3).await.expect("Failed to prune");
470
471            // Ensure keys 1 and 2 are no longer present
472            for (index, key, data) in keys {
473                let retrieved = archive
474                    .get(Identifier::Key(&key))
475                    .await
476                    .expect("Failed to get data");
477                if index < 3 {
478                    assert!(retrieved.is_none());
479                } else {
480                    assert_eq!(retrieved.expect("Data not found"), data);
481                }
482            }
483
484            // Check metrics
485            let buffer = context.encode();
486            assert!(buffer.contains("items_tracked 3"));
487            assert!(buffer.contains("indices_pruned_total 2"));
488            assert!(buffer.contains("pruned_total 0")); // no lazy cleanup yet
489
490            // Try to prune older section
491            archive.prune(2).await.expect("Failed to prune");
492
493            // Try to prune current section again
494            archive.prune(3).await.expect("Failed to prune");
495
496            // Try to put older index
497            let result = archive.put(1, test_key("key1-blah"), 1).await;
498            assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
499
500            // Trigger lazy removal of keys
501            archive
502                .put(6, test_key("key2-blfh"), 5)
503                .await
504                .expect("Failed to put data");
505
506            // Check metrics
507            let buffer = context.encode();
508            assert!(buffer.contains("items_tracked 4")); // lazily remove one, add one
509            assert!(buffer.contains("indices_pruned_total 2"));
510            assert!(buffer.contains("pruned_total 1"));
511        });
512    }
513
514    fn test_archive_keys_and_restart(num_keys: usize) -> String {
515        // Initialize the deterministic context
516        let executor = deterministic::Runner::default();
517        executor.start(|mut context| async move {
518            // Initialize the archive
519            let items_per_section = 256u64;
520            let cfg = Config {
521                translator: TwoCap,
522                key_partition: "test-index".into(),
523                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
524                value_partition: "test-value".into(),
525                codec_config: (),
526                compression: None,
527                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
528                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
529                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
530                items_per_section: NZU64!(items_per_section),
531            };
532            let mut archive = Archive::init(context.with_label("init1"), cfg.clone())
533                .await
534                .expect("Failed to initialize archive");
535
536            // Insert multiple keys across different sections
537            let mut keys = BTreeMap::new();
538            while keys.len() < num_keys {
539                let index = keys.len() as u64;
540                let mut key = [0u8; 64];
541                context.fill(&mut key);
542                let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
543                let mut data = [0u8; 1024];
544                context.fill(&mut data);
545                let data = FixedBytes::<1024>::decode(data.as_ref()).unwrap();
546
547                archive
548                    .put(index, key.clone(), data.clone())
549                    .await
550                    .expect("Failed to put data");
551                keys.insert(key, (index, data));
552            }
553
554            // Ensure all keys can be retrieved
555            for (key, (index, data)) in &keys {
556                let retrieved = archive
557                    .get(Identifier::Index(*index))
558                    .await
559                    .expect("Failed to get data")
560                    .expect("Data not found");
561                assert_eq!(&retrieved, data);
562                let retrieved = archive
563                    .get(Identifier::Key(key))
564                    .await
565                    .expect("Failed to get data")
566                    .expect("Data not found");
567                assert_eq!(&retrieved, data);
568            }
569
570            // Check metrics
571            let buffer = context.encode();
572            let tracked = format!("items_tracked {num_keys:?}");
573            assert!(buffer.contains(&tracked));
574            assert!(buffer.contains("pruned_total 0"));
575
576            // Sync and drop the archive
577            archive.sync().await.expect("Failed to sync archive");
578            drop(archive);
579
580            // Reinitialize the archive
581            let cfg = Config {
582                translator: TwoCap,
583                key_partition: "test-index".into(),
584                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
585                value_partition: "test-value".into(),
586                codec_config: (),
587                compression: None,
588                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
589                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
590                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
591                items_per_section: NZU64!(items_per_section),
592            };
593            let mut archive = Archive::<_, _, _, FixedBytes<1024>>::init(
594                context.with_label("init2"),
595                cfg.clone(),
596            )
597            .await
598            .expect("Failed to initialize archive");
599
600            // Ensure all keys can be retrieved
601            for (key, (index, data)) in &keys {
602                let retrieved = archive
603                    .get(Identifier::Index(*index))
604                    .await
605                    .expect("Failed to get data")
606                    .expect("Data not found");
607                assert_eq!(&retrieved, data);
608                let retrieved = archive
609                    .get(Identifier::Key(key))
610                    .await
611                    .expect("Failed to get data")
612                    .expect("Data not found");
613                assert_eq!(&retrieved, data);
614            }
615
616            // Prune first half
617            let min = (keys.len() / 2) as u64;
618            archive.prune(min).await.expect("Failed to prune");
619
620            // Ensure all keys can be retrieved that haven't been pruned
621            let min = (min / items_per_section) * items_per_section;
622            let mut removed = 0;
623            for (key, (index, data)) in keys {
624                if index >= min {
625                    let retrieved = archive
626                        .get(Identifier::Key(&key))
627                        .await
628                        .expect("Failed to get data")
629                        .expect("Data not found");
630                    assert_eq!(retrieved, data);
631
632                    // Check range
633                    let (current_end, start_next) = archive.next_gap(index);
634                    assert_eq!(current_end.unwrap(), num_keys as u64 - 1);
635                    assert!(start_next.is_none());
636                } else {
637                    let retrieved = archive
638                        .get(Identifier::Key(&key))
639                        .await
640                        .expect("Failed to get data");
641                    assert!(retrieved.is_none());
642                    removed += 1;
643
644                    // Check range
645                    let (current_end, start_next) = archive.next_gap(index);
646                    assert!(current_end.is_none());
647                    assert_eq!(start_next.unwrap(), min);
648                }
649            }
650
651            // Check metrics
652            let buffer = context.encode();
653            let tracked = format!("items_tracked {:?}", num_keys - removed);
654            assert!(buffer.contains(&tracked));
655            let pruned = format!("indices_pruned_total {removed}");
656            assert!(buffer.contains(&pruned));
657            assert!(buffer.contains("pruned_total 0")); // have not lazily removed keys yet
658
659            context.auditor().state()
660        })
661    }
662
663    #[test_group("slow")]
664    #[test_traced]
665    fn test_archive_many_keys_and_restart() {
666        test_archive_keys_and_restart(100_000);
667    }
668
669    #[test_group("slow")]
670    #[test_traced]
671    fn test_determinism() {
672        let state1 = test_archive_keys_and_restart(5_000);
673        let state2 = test_archive_keys_and_restart(5_000);
674        assert_eq!(state1, state2);
675    }
676
677    #[test_traced]
678    fn test_get_all_after_prune() {
679        let executor = deterministic::Runner::default();
680        executor.start(|context| async move {
681            let cfg = Config {
682                translator: FourCap,
683                key_partition: "test-index".into(),
684                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
685                value_partition: "test-value".into(),
686                codec_config: (),
687                compression: None,
688                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
689                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
690                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
691                items_per_section: NZU64!(1),
692            };
693            let mut archive = Archive::init(context.clone(), cfg)
694                .await
695                .expect("Failed to initialize archive");
696
697            archive.put_multi(1, test_key("aaa"), 10).await.unwrap();
698            archive.put_multi(1, test_key("bbb"), 20).await.unwrap();
699            archive.put_multi(3, test_key("ccc"), 30).await.unwrap();
700
701            // Prune below index 3
702            archive.prune(3).await.unwrap();
703
704            // Pruned index returns None
705            let all = archive.get_all(1).await.unwrap();
706            assert_eq!(all, None);
707
708            // Surviving index still works
709            let all = archive.get_all(3).await.unwrap();
710            assert_eq!(all, Some(vec![30]));
711        });
712    }
713
714    #[test_traced]
715    fn test_put_multi_prune() {
716        let executor = deterministic::Runner::default();
717        executor.start(|context| async move {
718            let cfg = Config {
719                translator: FourCap,
720                key_partition: "test-index".into(),
721                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
722                value_partition: "test-value".into(),
723                codec_config: (),
724                compression: None,
725                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
726                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
727                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
728                items_per_section: NZU64!(1),
729            };
730            let mut archive = Archive::init(context.clone(), cfg)
731                .await
732                .expect("Failed to initialize archive");
733
734            // Two items at index 1, one at index 3
735            archive.put_multi(1, test_key("aaa"), 10).await.unwrap();
736            archive.put_multi(1, test_key("bbb"), 20).await.unwrap();
737            archive.put_multi(3, test_key("ccc"), 30).await.unwrap();
738
739            let buffer = context.encode();
740            assert!(buffer.contains("items_tracked 2"));
741
742            // Prune below index 3
743            archive.prune(3).await.unwrap();
744
745            // Both items at index 1 are gone
746            assert_eq!(
747                archive
748                    .get(Identifier::Key(&test_key("aaa")))
749                    .await
750                    .unwrap(),
751                None
752            );
753            assert_eq!(
754                archive
755                    .get(Identifier::Key(&test_key("bbb")))
756                    .await
757                    .unwrap(),
758                None
759            );
760
761            // Item at index 3 survives
762            assert_eq!(
763                archive
764                    .get(Identifier::Key(&test_key("ccc")))
765                    .await
766                    .unwrap(),
767                Some(30)
768            );
769
770            let buffer = context.encode();
771            assert!(buffer.contains("items_tracked 1"));
772            assert!(buffer.contains("indices_pruned_total 1"));
773
774            // put_multi below pruned index is rejected
775            let result = archive.put_multi(2, test_key("ddd"), 40).await;
776            assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
777        });
778    }
779}