Skip to main content

commonware_storage/archive/prunable/
mod.rs

1//! A prunable key-value store for ordered data.
2//!
3//! Data is stored across two backends: [crate::journal::segmented::fixed] for fixed-size index entries and
4//! [crate::journal::segmented::glob::Glob] for values (managed by [crate::journal::segmented::oversized]).
5//! The location of written data is stored in-memory by both index and key (via [crate::index::unordered::Index])
6//! to enable efficient lookups (on average).
7//!
8//! _Notably, [Archive] does not make use of compaction nor on-disk indexes (and thus has no read
9//! nor write amplification during normal operation).
10//!
11//! # Format
12//!
13//! [Archive] uses a two-journal structure for efficient page cache usage:
14//!
15//! **Index Journal (segmented/fixed)** - Fixed-size entries for fast startup replay:
16//! ```text
17//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
18//! | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |13 |14 |15 |16 |17 |18 |19 |20 |21 |22 |23 |
19//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
20//! |          Index(u64)           |Key(Fixed Size)|        val_offset(u64)        | val_size(u32) |
21//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
22//! ```
23//!
24//! **Value Blob** - Raw values with CRC32 checksums (direct reads, no page cache):
25//! ```text
26//! +---+---+---+---+---+---+---+---+---+---+---+---+
27//! |     Compressed Data (variable)    |   CRC32   |
28//! +---+---+---+---+---+---+---+---+---+---+---+---+
29//! ```
30//!
31//! # Uniqueness
32//!
33//! [Archive] assumes all stored indexes and keys are unique. If the same key is associated with
34//! multiple `indices`, there is no guarantee which value will be returned. If the key is written to
35//! an existing `index`, [Archive] will return an error.
36//!
37//! ## Conflicts
38//!
39//! Because a translated representation of a key is only ever stored in memory, it is possible (and
40//! expected) that two keys will eventually be represented by the same translated key. To handle
41//! this case, [Archive] must check the persisted form of all conflicting keys to ensure data from
42//! the correct key is returned. To support efficient checks, [Archive] (via
43//! [crate::index::unordered::Index]) keeps a linked list of all keys with the same translated
44//! prefix:
45//!
46//! ```rust
47//! struct Record {
48//!     index: u64,
49//!
50//!     next: Option<Box<Record>>,
51//! }
52//! ```
53//!
54//! _To avoid random memory reads in the common case, the in-memory index directly stores the first
55//! item in the linked list instead of a pointer to the first item._
56//!
57//! `index` is the key to the map used to serve lookups by `index` that stores the position in the
58//! index journal (selected by `section = index / items_per_section * items_per_section` to minimize
59//! the number of open blobs):
60//!
61//! ```text
62//! // Maps index -> position in index journal
63//! indices: BTreeMap<u64, u64>
64//! ```
65//!
66//! _If the [Translator] provided by the caller does not uniformly distribute keys across the key
67//! space or uses a translated representation that means keys on average have many conflicts,
68//! performance will degrade._
69//!
70//! ## Memory Overhead
71//!
72//! [Archive] uses two maps to enable lookups by both index and key. The memory used to track each
73//! index item is `8 + 8` (where `8` is the index and `8` is the position in the index journal).
74//! The memory used to track each key item is `~translated(key).len() + 16` bytes (where `16` is the
75//! size of the `Record` struct). This means that an [Archive] employing a [Translator] that uses
76//! the first `8` bytes of a key will use `~40` bytes to index each key.
77//!
78//! ### MultiArchive Overhead
79//!
80//! [Archive] stores index positions in a dual-map layout:
81//! - `indices: BTreeMap<u64, u64>` tracks the first position for each index.
82//! - `extra_indices: BTreeMap<u64, Vec<u64>>` tracks additional positions for indices written via
83//!   [crate::archive::MultiArchive::put_multi].
84//!
85//! This means the baseline overhead above remains unchanged for the first item at an index. For
86//! indices with duplicates, the additional in-memory payload is:
87//! - one `Vec<u64>` header (`24` bytes), and
88//! - `n * 8` bytes for `n` additional positions.
89//!
90//! Equivalently, this is `24 + (n * 8)` bytes per duplicated index, excluding `BTreeMap` node
91//! overhead for `extra_indices`.
92//!
93//! # Pruning
94//!
95//! [Archive] supports pruning up to a minimum `index` using the `prune` method. After `prune` is
96//! called on a `section`, all interaction with a `section` less than the pruned `section` will
97//! return an error.
98//!
99//! ## Lazy Index Cleanup
100//!
101//! Instead of performing a full iteration of the in-memory index, storing an additional in-memory
102//! index per `section`, or replaying a `section` of the value blob,
103//! [Archive] lazily cleans up the [crate::index::unordered::Index] after pruning. When a new key is
104//! stored that overlaps (same translated value) with a pruned key, the pruned key is removed from
105//! the in-memory index.
106//!
107//! # Read Path
108//!
109//! All reads (by index or key) first read the index entry from the index journal to get the
110//! value location (offset and size), then read the value from the value blob. The index journal
111//! uses a page cache for caching, so hot entries are served from memory. Values are read directly
112//! from disk without caching to avoid polluting the page cache with large values.
113//!
114//! # Compression
115//!
116//! [Archive] supports compressing data before storing it on disk. This can be enabled by setting
117//! the `compression` field in the `Config` struct to a valid `zstd` compression level. This setting
118//! can be changed between initializations of [Archive], however, it must remain populated if any
119//! data was written with compression enabled.
120//!
121//! # Querying for Gaps
122//!
123//! [Archive] tracks gaps in the index space to enable the caller to efficiently fetch unknown keys
124//! using `next_gap`. This is a very common pattern when syncing blocks in a blockchain.
125//!
126//! # Example
127//!
128//! ```rust
129//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
130//! use commonware_cryptography::{Hasher as _, Sha256};
131//! use commonware_storage::{
132//!     translator::FourCap,
133//!     archive::{
134//!         Archive as _,
135//!         prunable::{Archive, Config},
136//!     },
137//! };
138//! use commonware_utils::{NZUsize, NZU16, NZU64};
139//!
140//! let executor = deterministic::Runner::default();
141//! executor.start(|context| async move {
142//!     // Create an archive
143//!     let cfg = Config {
144//!         translator: FourCap,
145//!         key_partition: "demo-index".into(),
146//!         key_page_cache: CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10)),
147//!         value_partition: "demo-value".into(),
148//!         compression: Some(3),
149//!         codec_config: (),
150//!         items_per_section: NZU64!(1024),
151//!         key_write_buffer: NZUsize!(1024 * 1024),
152//!         value_write_buffer: NZUsize!(1024 * 1024),
153//!         replay_buffer: NZUsize!(4096),
154//!     };
155//!     let mut archive = Archive::init(context, cfg).await.unwrap();
156//!
157//!     // Put a key
158//!     archive.put(1, Sha256::hash(b"data"), 10).await.unwrap();
159//!
160//!     // Sync the archive
161//!     archive.sync().await.unwrap();
162//! });
163//! ```
164
165use crate::translator::Translator;
166use commonware_runtime::buffer::paged::CacheRef;
167use std::num::{NonZeroU64, NonZeroUsize};
168
169mod storage;
170pub use storage::Archive;
171
172/// Configuration for [Archive] storage.
173#[derive(Clone)]
174pub struct Config<T: Translator, C> {
175    /// Logic to transform keys into their index representation.
176    ///
177    /// [Archive] assumes that all internal keys are spread uniformly across the key space.
178    /// If that is not the case, lookups may be O(n) instead of O(1).
179    pub translator: T,
180
181    /// The partition to use for the key journal (stores index+key metadata).
182    pub key_partition: String,
183
184    /// The page cache to use for the key journal.
185    pub key_page_cache: CacheRef,
186
187    /// The partition to use for the value blob (stores values).
188    pub value_partition: String,
189
190    /// The compression level to use for the value blob.
191    pub compression: Option<u8>,
192
193    /// The [commonware_codec::Codec] configuration to use for the value stored in the archive.
194    pub codec_config: C,
195
196    /// The number of items per section (the granularity of pruning).
197    pub items_per_section: NonZeroU64,
198
199    /// The amount of bytes that can be buffered for the key journal before being written to a
200    /// [commonware_runtime::Blob].
201    pub key_write_buffer: NonZeroUsize,
202
203    /// The amount of bytes that can be buffered for the value journal before being written to a
204    /// [commonware_runtime::Blob].
205    pub value_write_buffer: NonZeroUsize,
206
207    /// The buffer size to use when replaying a [commonware_runtime::Blob].
208    pub replay_buffer: NonZeroUsize,
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use crate::{
215        archive::{Archive as _, Error, Identifier, MultiArchive as _},
216        journal::Error as JournalError,
217        translator::{FourCap, TwoCap},
218    };
219    use commonware_codec::{DecodeExt, Error as CodecError};
220    use commonware_macros::{test_group, test_traced};
221    use commonware_runtime::{deterministic, Metrics, Runner};
222    use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
223    use rand::Rng;
224    use std::{collections::BTreeMap, num::NonZeroU16};
225
226    fn test_key(key: &str) -> FixedBytes<64> {
227        let mut buf = [0u8; 64];
228        let key = key.as_bytes();
229        assert!(key.len() <= buf.len());
230        buf[..key.len()].copy_from_slice(key);
231        FixedBytes::decode(buf.as_ref()).unwrap()
232    }
233
234    const DEFAULT_ITEMS_PER_SECTION: u64 = 65536;
235    const DEFAULT_WRITE_BUFFER: usize = 1024;
236    const DEFAULT_REPLAY_BUFFER: usize = 4096;
237    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
238    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
239
240    #[test_traced]
241    fn test_archive_compression_then_none() {
242        // Initialize the deterministic context
243        let executor = deterministic::Runner::default();
244        executor.start(|context| async move {
245            // Initialize the archive
246            let cfg = Config {
247                translator: FourCap,
248                key_partition: "test-index".into(),
249                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
250                value_partition: "test-value".into(),
251                codec_config: (),
252                compression: Some(3),
253                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
254                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
255                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
256                items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
257            };
258            let mut archive = Archive::init(context.with_label("first"), cfg.clone())
259                .await
260                .expect("Failed to initialize archive");
261
262            // Put the key-data pair
263            let index = 1u64;
264            let key = test_key("testkey");
265            let data = 1;
266            archive
267                .put(index, key.clone(), data)
268                .await
269                .expect("Failed to put data");
270
271            // Sync and drop the archive
272            archive.sync().await.expect("Failed to sync archive");
273            drop(archive);
274
275            // Initialize the archive again without compression.
276            // Index journal replay succeeds (no compression), but value reads will fail.
277            let cfg = Config {
278                translator: FourCap,
279                key_partition: "test-index".into(),
280                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
281                value_partition: "test-value".into(),
282                codec_config: (),
283                compression: None,
284                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
285                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
286                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
287                items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
288            };
289            let archive = Archive::<_, _, FixedBytes<64>, i32>::init(
290                context.with_label("second"),
291                cfg.clone(),
292            )
293            .await
294            .unwrap();
295
296            // Getting the value should fail because compression settings mismatch.
297            // Without compression, the codec sees extra bytes after decoding the value
298            // (because the compressed data doesn't match the expected format).
299            let result: Result<Option<i32>, _> = archive.get(Identifier::Index(index)).await;
300            assert!(matches!(
301                result,
302                Err(Error::Journal(JournalError::Codec(CodecError::ExtraData(
303                    _
304                ))))
305            ));
306        });
307    }
308
309    #[test_traced]
310    fn test_archive_overlapping_key_basic() {
311        // Initialize the deterministic context
312        let executor = deterministic::Runner::default();
313        executor.start(|context| async move {
314            // Initialize the archive
315            let cfg = Config {
316                translator: FourCap,
317                key_partition: "test-index".into(),
318                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
319                value_partition: "test-value".into(),
320                codec_config: (),
321                compression: None,
322                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
323                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
324                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
325                items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
326            };
327            let mut archive = Archive::init(context.clone(), cfg.clone())
328                .await
329                .expect("Failed to initialize archive");
330
331            let index1 = 1u64;
332            let key1 = test_key("keys1");
333            let data1 = 1;
334            let index2 = 2u64;
335            let key2 = test_key("keys2");
336            let data2 = 2;
337
338            // Put the key-data pair
339            archive
340                .put(index1, key1.clone(), data1)
341                .await
342                .expect("Failed to put data");
343
344            // Put the key-data pair
345            archive
346                .put(index2, key2.clone(), data2)
347                .await
348                .expect("Failed to put data");
349
350            // Get the data back
351            let retrieved = archive
352                .get(Identifier::Key(&key1))
353                .await
354                .expect("Failed to get data")
355                .expect("Data not found");
356            assert_eq!(retrieved, data1);
357
358            // Get the data back
359            let retrieved = archive
360                .get(Identifier::Key(&key2))
361                .await
362                .expect("Failed to get data")
363                .expect("Data not found");
364            assert_eq!(retrieved, data2);
365
366            // Check metrics
367            let buffer = context.encode();
368            assert!(buffer.contains("items_tracked 2"));
369            assert!(buffer.contains("unnecessary_reads_total 1"));
370            assert!(buffer.contains("gets_total 2"));
371        });
372    }
373
374    #[test_traced]
375    fn test_archive_overlapping_key_multiple_sections() {
376        // Initialize the deterministic context
377        let executor = deterministic::Runner::default();
378        executor.start(|context| async move {
379            // Initialize the archive
380            let cfg = Config {
381                translator: FourCap,
382                key_partition: "test-index".into(),
383                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
384                value_partition: "test-value".into(),
385                codec_config: (),
386                compression: None,
387                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
388                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
389                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
390                items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
391            };
392            let mut archive = Archive::init(context.clone(), cfg.clone())
393                .await
394                .expect("Failed to initialize archive");
395
396            let index1 = 1u64;
397            let key1 = test_key("keys1");
398            let data1 = 1;
399            let index2 = 2_000_000u64;
400            let key2 = test_key("keys2");
401            let data2 = 2;
402
403            // Put the key-data pair
404            archive
405                .put(index1, key1.clone(), data1)
406                .await
407                .expect("Failed to put data");
408
409            // Put the key-data pair
410            archive
411                .put(index2, key2.clone(), data2)
412                .await
413                .expect("Failed to put data");
414
415            // Get the data back
416            let retrieved = archive
417                .get(Identifier::Key(&key1))
418                .await
419                .expect("Failed to get data")
420                .expect("Data not found");
421            assert_eq!(retrieved, data1);
422
423            // Get the data back
424            let retrieved = archive
425                .get(Identifier::Key(&key2))
426                .await
427                .expect("Failed to get data")
428                .expect("Data not found");
429            assert_eq!(retrieved, data2);
430        });
431    }
432
433    #[test_traced]
434    fn test_archive_prune_keys() {
435        // Initialize the deterministic context
436        let executor = deterministic::Runner::default();
437        executor.start(|context| async move {
438            // Initialize the archive
439            let cfg = Config {
440                translator: FourCap,
441                key_partition: "test-index".into(),
442                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
443                value_partition: "test-value".into(),
444                codec_config: (),
445                compression: None,
446                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
447                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
448                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
449                items_per_section: NZU64!(1), // no mask - each item is its own section
450            };
451            let mut archive = Archive::init(context.clone(), cfg.clone())
452                .await
453                .expect("Failed to initialize archive");
454
455            // Insert multiple keys across different sections
456            let keys = vec![
457                (1u64, test_key("key1-blah"), 1),
458                (2u64, test_key("key2-blah"), 2),
459                (3u64, test_key("key3-blah"), 3),
460                (4u64, test_key("key3-bleh"), 3),
461                (5u64, test_key("key4-blah"), 4),
462            ];
463
464            for (index, key, data) in &keys {
465                archive
466                    .put(*index, key.clone(), *data)
467                    .await
468                    .expect("Failed to put data");
469            }
470
471            // Check metrics
472            let buffer = context.encode();
473            assert!(buffer.contains("items_tracked 5"));
474
475            // Prune sections less than 3
476            archive.prune(3).await.expect("Failed to prune");
477
478            // Ensure keys 1 and 2 are no longer present
479            for (index, key, data) in keys {
480                let retrieved = archive
481                    .get(Identifier::Key(&key))
482                    .await
483                    .expect("Failed to get data");
484                if index < 3 {
485                    assert!(retrieved.is_none());
486                } else {
487                    assert_eq!(retrieved.expect("Data not found"), data);
488                }
489            }
490
491            // Check metrics
492            let buffer = context.encode();
493            assert!(buffer.contains("items_tracked 3"));
494            assert!(buffer.contains("indices_pruned_total 2"));
495            assert!(buffer.contains("pruned_total 0")); // no lazy cleanup yet
496
497            // Try to prune older section
498            archive.prune(2).await.expect("Failed to prune");
499
500            // Try to prune current section again
501            archive.prune(3).await.expect("Failed to prune");
502
503            // Try to put older index
504            let result = archive.put(1, test_key("key1-blah"), 1).await;
505            assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
506
507            // Trigger lazy removal of keys
508            archive
509                .put(6, test_key("key2-blfh"), 5)
510                .await
511                .expect("Failed to put data");
512
513            // Check metrics
514            let buffer = context.encode();
515            assert!(buffer.contains("items_tracked 4")); // lazily remove one, add one
516            assert!(buffer.contains("indices_pruned_total 2"));
517            assert!(buffer.contains("pruned_total 1"));
518        });
519    }
520
521    fn test_archive_keys_and_restart(num_keys: usize) -> String {
522        // Initialize the deterministic context
523        let executor = deterministic::Runner::default();
524        executor.start(|mut context| async move {
525            // Initialize the archive
526            let items_per_section = 256u64;
527            let cfg = Config {
528                translator: TwoCap,
529                key_partition: "test-index".into(),
530                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
531                value_partition: "test-value".into(),
532                codec_config: (),
533                compression: None,
534                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
535                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
536                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
537                items_per_section: NZU64!(items_per_section),
538            };
539            let mut archive = Archive::init(context.with_label("init1"), cfg.clone())
540                .await
541                .expect("Failed to initialize archive");
542
543            // Insert multiple keys across different sections
544            let mut keys = BTreeMap::new();
545            while keys.len() < num_keys {
546                let index = keys.len() as u64;
547                let mut key = [0u8; 64];
548                context.fill(&mut key);
549                let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
550                let mut data = [0u8; 1024];
551                context.fill(&mut data);
552                let data = FixedBytes::<1024>::decode(data.as_ref()).unwrap();
553
554                archive
555                    .put(index, key.clone(), data.clone())
556                    .await
557                    .expect("Failed to put data");
558                keys.insert(key, (index, data));
559            }
560
561            // Ensure all keys can be retrieved
562            for (key, (index, data)) in &keys {
563                let retrieved = archive
564                    .get(Identifier::Index(*index))
565                    .await
566                    .expect("Failed to get data")
567                    .expect("Data not found");
568                assert_eq!(&retrieved, data);
569                let retrieved = archive
570                    .get(Identifier::Key(key))
571                    .await
572                    .expect("Failed to get data")
573                    .expect("Data not found");
574                assert_eq!(&retrieved, data);
575            }
576
577            // Check metrics
578            let buffer = context.encode();
579            let tracked = format!("items_tracked {num_keys:?}");
580            assert!(buffer.contains(&tracked));
581            assert!(buffer.contains("pruned_total 0"));
582
583            // Sync and drop the archive
584            archive.sync().await.expect("Failed to sync archive");
585            drop(archive);
586
587            // Reinitialize the archive
588            let cfg = Config {
589                translator: TwoCap,
590                key_partition: "test-index".into(),
591                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
592                value_partition: "test-value".into(),
593                codec_config: (),
594                compression: None,
595                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
596                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
597                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
598                items_per_section: NZU64!(items_per_section),
599            };
600            let mut archive = Archive::<_, _, _, FixedBytes<1024>>::init(
601                context.with_label("init2"),
602                cfg.clone(),
603            )
604            .await
605            .expect("Failed to initialize archive");
606
607            // Ensure all keys can be retrieved
608            for (key, (index, data)) in &keys {
609                let retrieved = archive
610                    .get(Identifier::Index(*index))
611                    .await
612                    .expect("Failed to get data")
613                    .expect("Data not found");
614                assert_eq!(&retrieved, data);
615                let retrieved = archive
616                    .get(Identifier::Key(key))
617                    .await
618                    .expect("Failed to get data")
619                    .expect("Data not found");
620                assert_eq!(&retrieved, data);
621            }
622
623            // Prune first half
624            let min = (keys.len() / 2) as u64;
625            archive.prune(min).await.expect("Failed to prune");
626
627            // Ensure all keys can be retrieved that haven't been pruned
628            let min = (min / items_per_section) * items_per_section;
629            let mut removed = 0;
630            for (key, (index, data)) in keys {
631                if index >= min {
632                    let retrieved = archive
633                        .get(Identifier::Key(&key))
634                        .await
635                        .expect("Failed to get data")
636                        .expect("Data not found");
637                    assert_eq!(retrieved, data);
638
639                    // Check range
640                    let (current_end, start_next) = archive.next_gap(index);
641                    assert_eq!(current_end.unwrap(), num_keys as u64 - 1);
642                    assert!(start_next.is_none());
643                } else {
644                    let retrieved = archive
645                        .get(Identifier::Key(&key))
646                        .await
647                        .expect("Failed to get data");
648                    assert!(retrieved.is_none());
649                    removed += 1;
650
651                    // Check range
652                    let (current_end, start_next) = archive.next_gap(index);
653                    assert!(current_end.is_none());
654                    assert_eq!(start_next.unwrap(), min);
655                }
656            }
657
658            // Check metrics
659            let buffer = context.encode();
660            let tracked = format!("items_tracked {:?}", num_keys - removed);
661            assert!(buffer.contains(&tracked));
662            let pruned = format!("indices_pruned_total {removed}");
663            assert!(buffer.contains(&pruned));
664            assert!(buffer.contains("pruned_total 0")); // have not lazily removed keys yet
665
666            context.auditor().state()
667        })
668    }
669
670    #[test_group("slow")]
671    #[test_traced]
672    fn test_archive_many_keys_and_restart() {
673        test_archive_keys_and_restart(100_000);
674    }
675
676    #[test_group("slow")]
677    #[test_traced]
678    fn test_determinism() {
679        let state1 = test_archive_keys_and_restart(5_000);
680        let state2 = test_archive_keys_and_restart(5_000);
681        assert_eq!(state1, state2);
682    }
683
684    #[test_traced]
685    fn test_get_all_after_prune() {
686        let executor = deterministic::Runner::default();
687        executor.start(|context| async move {
688            let cfg = Config {
689                translator: FourCap,
690                key_partition: "test-index".into(),
691                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
692                value_partition: "test-value".into(),
693                codec_config: (),
694                compression: None,
695                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
696                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
697                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
698                items_per_section: NZU64!(1),
699            };
700            let mut archive = Archive::init(context.clone(), cfg)
701                .await
702                .expect("Failed to initialize archive");
703
704            archive.put_multi(1, test_key("aaa"), 10).await.unwrap();
705            archive.put_multi(1, test_key("bbb"), 20).await.unwrap();
706            archive.put_multi(3, test_key("ccc"), 30).await.unwrap();
707
708            // Prune below index 3
709            archive.prune(3).await.unwrap();
710
711            // Pruned index returns None
712            let all = archive.get_all(1).await.unwrap();
713            assert_eq!(all, None);
714
715            // Surviving index still works
716            let all = archive.get_all(3).await.unwrap();
717            assert_eq!(all, Some(vec![30]));
718        });
719    }
720
721    #[test_traced]
722    fn test_put_multi_prune() {
723        let executor = deterministic::Runner::default();
724        executor.start(|context| async move {
725            let cfg = Config {
726                translator: FourCap,
727                key_partition: "test-index".into(),
728                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
729                value_partition: "test-value".into(),
730                codec_config: (),
731                compression: None,
732                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
733                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
734                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
735                items_per_section: NZU64!(1),
736            };
737            let mut archive = Archive::init(context.clone(), cfg)
738                .await
739                .expect("Failed to initialize archive");
740
741            // Two items at index 1, one at index 3
742            archive.put_multi(1, test_key("aaa"), 10).await.unwrap();
743            archive.put_multi(1, test_key("bbb"), 20).await.unwrap();
744            archive.put_multi(3, test_key("ccc"), 30).await.unwrap();
745
746            let buffer = context.encode();
747            assert!(buffer.contains("items_tracked 2"));
748
749            // Prune below index 3
750            archive.prune(3).await.unwrap();
751
752            // Both items at index 1 are gone
753            assert_eq!(
754                archive
755                    .get(Identifier::Key(&test_key("aaa")))
756                    .await
757                    .unwrap(),
758                None
759            );
760            assert_eq!(
761                archive
762                    .get(Identifier::Key(&test_key("bbb")))
763                    .await
764                    .unwrap(),
765                None
766            );
767
768            // Item at index 3 survives
769            assert_eq!(
770                archive
771                    .get(Identifier::Key(&test_key("ccc")))
772                    .await
773                    .unwrap(),
774                Some(30)
775            );
776
777            let buffer = context.encode();
778            assert!(buffer.contains("items_tracked 1"));
779            assert!(buffer.contains("indices_pruned_total 1"));
780
781            // put_multi below pruned index is rejected
782            let result = archive.put_multi(2, test_key("ddd"), 40).await;
783            assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
784        });
785    }
786}