Skip to main content

commonware_storage/archive/prunable/
mod.rs

1//! A prunable key-value store for ordered data.
2//!
3//! Data is stored across two backends: [crate::journal::segmented::fixed] for fixed-size index entries and
4//! [crate::journal::segmented::glob::Glob] for values (managed by [crate::journal::segmented::oversized]).
5//! The location of written data is stored in-memory by both index and key (via [crate::index::unordered::Index])
6//! to enable efficient lookups (on average).
7//!
8//! _Notably, [Archive] does not make use of compaction nor on-disk indexes (and thus has no read
9//! nor write amplification during normal operation).
10//!
11//! # Format
12//!
13//! [Archive] uses a two-journal structure for efficient page cache usage:
14//!
15//! **Index Journal (segmented/fixed)** - Fixed-size entries for fast startup replay:
16//! ```text
17//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
18//! | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |13 |14 |15 |16 |17 |18 |19 |20 |21 |22 |23 |
19//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
20//! |          Index(u64)           |Key(Fixed Size)|        val_offset(u64)        | val_size(u32) |
21//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
22//! ```
23//!
24//! **Value Blob** - Raw values with CRC32 checksums (direct reads, no page cache):
25//! ```text
26//! +---+---+---+---+---+---+---+---+---+---+---+---+
27//! |     Compressed Data (variable)    |   CRC32   |
28//! +---+---+---+---+---+---+---+---+---+---+---+---+
29//! ```
30//!
31//! # Uniqueness
32//!
33//! Indices are unique for [Archive] and writing to an occupied index is a no-op. Duplicate
34//! indices can be stored via [`crate::archive::MultiArchive::put_multi`].
35//!
36//! Keys may be stored at multiple indices with either put variant. A lookup by
37//! [`crate::archive::Identifier::Key`] may return any of the values at that key. Entries
38//! whose index has been pruned are never returned or reported as present, so a key matching
39//! both a pruned and a non-pruned entry resolves to the non-pruned entry.
40//!
41//! ## Conflicts
42//!
43//! Because a translated representation of a key is only ever stored in memory, it is possible (and
44//! expected) that two keys will eventually be represented by the same translated key. To handle
45//! this case, [Archive] must check the persisted form of all conflicting keys to ensure data from
46//! the correct key is returned. To support efficient checks, [Archive] (via
47//! [crate::index::unordered::Index]) keeps a linked list of all keys with the same translated
48//! prefix:
49//!
50//! ```rust
51//! struct Record {
52//!     index: u64,
53//!
54//!     next: Option<Box<Record>>,
55//! }
56//! ```
57//!
58//! _To avoid random memory reads in the common case, the in-memory index directly stores the first
59//! item in the linked list instead of a pointer to the first item._
60//!
61//! `index` is the key to the map used to serve lookups by `index` that stores the position in the
62//! index journal (selected by `section = index / items_per_section * items_per_section` to minimize
63//! the number of open blobs):
64//!
65//! ```text
66//! // Maps index -> position in index journal
67//! indices: BTreeMap<u64, u64>
68//! ```
69//!
70//! _If the [Translator] provided by the caller does not uniformly distribute keys across the key
71//! space or uses a translated representation that means keys on average have many conflicts,
72//! performance will degrade._
73//!
74//! ## Memory Overhead
75//!
76//! [Archive] uses two maps to enable lookups by both index and key. The memory used to track each
77//! index item is `8 + 8` (where `8` is the index and `8` is the position in the index journal).
78//! The memory used to track each key item is `~translated(key).len() + 16` bytes (where `16` is the
79//! size of the `Record` struct). This means that an [Archive] employing a [Translator] that uses
80//! the first `8` bytes of a key will use `~40` bytes to index each key.
81//!
82//! ### MultiArchive Overhead
83//!
84//! [Archive] stores index positions in a dual-map layout:
85//! - `indices: BTreeMap<u64, u64>` tracks the first position for each index.
86//! - `extra_indices: BTreeMap<u64, Vec<u64>>` tracks additional positions for indices written via
87//!   [crate::archive::MultiArchive::put_multi].
88//!
89//! This means the baseline overhead above remains unchanged for the first item at an index. For
90//! indices with duplicates, the additional in-memory payload is:
91//! - one `Vec<u64>` header (`24` bytes), and
92//! - `n * 8` bytes for `n` additional positions.
93//!
94//! Equivalently, this is `24 + (n * 8)` bytes per duplicated index, excluding `BTreeMap` node
95//! overhead for `extra_indices`.
96//!
97//! # Pruning
98//!
99//! [Archive] supports pruning up to a minimum `index` using the `prune` method. After `prune` is
100//! called on a `section`, all interaction with a `section` less than the pruned `section` will
101//! return an error.
102//!
103//! ## Lazy Index Cleanup
104//!
105//! Instead of performing a full iteration of the in-memory index, storing an additional in-memory
106//! index per `section`, or replaying a `section` of the value blob,
107//! [Archive] lazily cleans up the [crate::index::unordered::Index] after pruning. When a new key is
108//! stored that overlaps (same translated value) with a pruned key, the pruned key is removed from
109//! the in-memory index.
110//!
111//! # Read Path
112//!
113//! All reads (by index or key) first read the index entry from the index journal to get the
114//! value location (offset and size), then read the value from the value blob. The index journal
115//! uses a page cache for caching, so hot entries are served from memory. Values are read directly
116//! from disk without caching to avoid polluting the page cache with large values.
117//!
118//! # Compression
119//!
120//! [Archive] supports compressing data before storing it on disk. This can be enabled by setting
121//! the `compression` field in the `Config` struct to a valid `zstd` compression level. This setting
122//! can be changed between initializations of [Archive], however, it must remain populated if any
123//! data was written with compression enabled.
124//!
125//! # Querying for Gaps
126//!
127//! [Archive] tracks gaps in the index space to enable the caller to efficiently fetch unknown keys
128//! using `next_gap`. This is a very common pattern when syncing blocks in a blockchain.
129//!
130//! # Example
131//!
132//! ```rust
133//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
134//! use commonware_cryptography::{Hasher as _, Sha256};
135//! use commonware_storage::{
136//!     translator::FourCap,
137//!     archive::{
138//!         Archive as _,
139//!         prunable::{Archive, Config},
140//!     },
141//! };
142//! use commonware_utils::{NZUsize, NZU16, NZU64};
143//!
144//! let executor = deterministic::Runner::default();
145//! executor.start(|context| async move {
146//!     // Create an archive
147//!     let cfg = Config {
148//!         translator: FourCap,
149//!         key_partition: "demo-index".into(),
150//!         key_page_cache: CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10)),
151//!         value_partition: "demo-value".into(),
152//!         compression: Some(3),
153//!         codec_config: (),
154//!         items_per_section: NZU64!(1024),
155//!         key_write_buffer: NZUsize!(1024 * 1024),
156//!         value_write_buffer: NZUsize!(1024 * 1024),
157//!         replay_buffer: NZUsize!(4096),
158//!     };
159//!     let mut archive = Archive::init(context, cfg).await.unwrap();
160//!
161//!     // Put a key
162//!     archive.put(1, Sha256::hash(b"data"), 10).await.unwrap();
163//!
164//!     // Sync the archive
165//!     archive.sync().await.unwrap();
166//! });
167//! ```
168
169use crate::translator::Translator;
170use commonware_runtime::buffer::paged::CacheRef;
171use std::num::{NonZeroU64, NonZeroUsize};
172
173mod storage;
174pub use storage::Archive;
175
176/// Configuration for [Archive] storage.
177#[derive(Clone)]
178pub struct Config<T: Translator, C> {
179    /// Logic to transform keys into their index representation.
180    ///
181    /// [Archive] assumes that all internal keys are spread uniformly across the key space.
182    /// If that is not the case, lookups may be O(n) instead of O(1).
183    pub translator: T,
184
185    /// The partition to use for the key journal (stores index+key metadata).
186    pub key_partition: String,
187
188    /// The page cache to use for the key journal.
189    pub key_page_cache: CacheRef,
190
191    /// The partition to use for the value blob (stores values).
192    pub value_partition: String,
193
194    /// The compression level to use for the value blob.
195    pub compression: Option<u8>,
196
197    /// The [commonware_codec::Codec] configuration to use for the value stored in the archive.
198    pub codec_config: C,
199
200    /// The number of items per section (the granularity of pruning).
201    pub items_per_section: NonZeroU64,
202
203    /// The amount of bytes that can be buffered for the key journal before being written to a
204    /// [commonware_runtime::Blob].
205    pub key_write_buffer: NonZeroUsize,
206
207    /// The amount of bytes that can be buffered for the value journal before being written to a
208    /// [commonware_runtime::Blob].
209    pub value_write_buffer: NonZeroUsize,
210
211    /// The buffer size to use when replaying a [commonware_runtime::Blob].
212    pub replay_buffer: NonZeroUsize,
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use crate::{
219        archive::{Archive as _, Error, Identifier, MultiArchive as _},
220        journal::Error as JournalError,
221        translator::{FourCap, TwoCap},
222    };
223    use commonware_codec::{DecodeExt, Error as CodecError};
224    use commonware_macros::{test_group, test_traced};
225    use commonware_runtime::{
226        deterministic, telemetry::metrics::has_metric_value, Metrics as _, Runner, Supervisor as _,
227    };
228    use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
229    use rand::Rng;
230    use std::{collections::BTreeMap, num::NonZeroU16};
231
232    fn test_key(key: &str) -> FixedBytes<64> {
233        let mut buf = [0u8; 64];
234        let key = key.as_bytes();
235        assert!(key.len() <= buf.len());
236        buf[..key.len()].copy_from_slice(key);
237        FixedBytes::decode(buf.as_ref()).unwrap()
238    }
239
240    const DEFAULT_ITEMS_PER_SECTION: u64 = 65536;
241    const DEFAULT_WRITE_BUFFER: usize = 1024;
242    const DEFAULT_REPLAY_BUFFER: usize = 4096;
243    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
244    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
245
246    #[test_traced]
247    fn test_archive_compression_then_none() {
248        // Initialize the deterministic context
249        let executor = deterministic::Runner::default();
250        executor.start(|context| async move {
251            // Initialize the archive
252            let cfg = Config {
253                translator: FourCap,
254                key_partition: "test-index".into(),
255                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
256                value_partition: "test-value".into(),
257                codec_config: (),
258                compression: Some(3),
259                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
260                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
261                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
262                items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
263            };
264            let mut archive = Archive::init(context.child("first"), cfg.clone())
265                .await
266                .expect("Failed to initialize archive");
267
268            // Put the key-data pair
269            let index = 1u64;
270            let key = test_key("testkey");
271            let data = 1;
272            archive
273                .put(index, key.clone(), data)
274                .await
275                .expect("Failed to put data");
276
277            // Sync and drop the archive
278            archive.sync().await.expect("Failed to sync archive");
279            drop(archive);
280
281            // Initialize the archive again without compression.
282            // Index journal replay succeeds (no compression), but value reads will fail.
283            let cfg = Config {
284                translator: FourCap,
285                key_partition: "test-index".into(),
286                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
287                value_partition: "test-value".into(),
288                codec_config: (),
289                compression: None,
290                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
291                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
292                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
293                items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
294            };
295            let archive =
296                Archive::<_, _, FixedBytes<64>, i32>::init(context.child("second"), cfg.clone())
297                    .await
298                    .unwrap();
299
300            // Getting the value should fail because compression settings mismatch.
301            // Without compression, the codec sees extra bytes after decoding the value
302            // (because the compressed data doesn't match the expected format).
303            let result: Result<Option<i32>, _> = archive.get(Identifier::Index(index)).await;
304            assert!(matches!(
305                result,
306                Err(Error::Journal(JournalError::Codec(CodecError::ExtraData(
307                    _
308                ))))
309            ));
310        });
311    }
312
313    #[test_traced]
314    fn test_archive_overlapping_key_basic() {
315        // Initialize the deterministic context
316        let executor = deterministic::Runner::default();
317        executor.start(|context| async move {
318            // Initialize the archive
319            let cfg = Config {
320                translator: FourCap,
321                key_partition: "test-index".into(),
322                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
323                value_partition: "test-value".into(),
324                codec_config: (),
325                compression: None,
326                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
327                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
328                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
329                items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
330            };
331            let mut archive = Archive::init(context.child("storage"), cfg.clone())
332                .await
333                .expect("Failed to initialize archive");
334
335            let index1 = 1u64;
336            let key1 = test_key("keys1");
337            let data1 = 1;
338            let index2 = 2u64;
339            let key2 = test_key("keys2");
340            let data2 = 2;
341
342            // Put the key-data pair
343            archive
344                .put(index1, key1.clone(), data1)
345                .await
346                .expect("Failed to put data");
347
348            // Put the key-data pair
349            archive
350                .put(index2, key2.clone(), data2)
351                .await
352                .expect("Failed to put data");
353
354            // Get the data back
355            let retrieved = archive
356                .get(Identifier::Key(&key1))
357                .await
358                .expect("Failed to get data")
359                .expect("Data not found");
360            assert_eq!(retrieved, data1);
361
362            // Get the data back
363            let retrieved = archive
364                .get(Identifier::Key(&key2))
365                .await
366                .expect("Failed to get data")
367                .expect("Data not found");
368            assert_eq!(retrieved, data2);
369
370            // Check metrics
371            let buffer = context.encode();
372            assert!(has_metric_value(&buffer, "items_tracked", 2));
373            assert!(buffer.contains("unnecessary_reads_total 1"));
374            assert!(buffer.contains("gets_total 2"));
375        });
376    }
377
378    #[test_traced]
379    fn test_archive_overlapping_key_multiple_sections() {
380        // Initialize the deterministic context
381        let executor = deterministic::Runner::default();
382        executor.start(|context| async move {
383            // Initialize the archive
384            let cfg = Config {
385                translator: FourCap,
386                key_partition: "test-index".into(),
387                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
388                value_partition: "test-value".into(),
389                codec_config: (),
390                compression: None,
391                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
392                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
393                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
394                items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
395            };
396            let mut archive = Archive::init(context.child("storage"), cfg.clone())
397                .await
398                .expect("Failed to initialize archive");
399
400            let index1 = 1u64;
401            let key1 = test_key("keys1");
402            let data1 = 1;
403            let index2 = 2_000_000u64;
404            let key2 = test_key("keys2");
405            let data2 = 2;
406
407            // Put the key-data pair
408            archive
409                .put(index1, key1.clone(), data1)
410                .await
411                .expect("Failed to put data");
412
413            // Put the key-data pair
414            archive
415                .put(index2, key2.clone(), data2)
416                .await
417                .expect("Failed to put data");
418
419            // Get the data back
420            let retrieved = archive
421                .get(Identifier::Key(&key1))
422                .await
423                .expect("Failed to get data")
424                .expect("Data not found");
425            assert_eq!(retrieved, data1);
426
427            // Get the data back
428            let retrieved = archive
429                .get(Identifier::Key(&key2))
430                .await
431                .expect("Failed to get data")
432                .expect("Data not found");
433            assert_eq!(retrieved, data2);
434        });
435    }
436
437    #[test_traced]
438    fn test_archive_prune_keys() {
439        // Initialize the deterministic context
440        let executor = deterministic::Runner::default();
441        executor.start(|context| async move {
442            // Initialize the archive
443            let cfg = Config {
444                translator: FourCap,
445                key_partition: "test-index".into(),
446                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
447                value_partition: "test-value".into(),
448                codec_config: (),
449                compression: None,
450                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
451                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
452                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
453                items_per_section: NZU64!(1), // no mask - each item is its own section
454            };
455            let mut archive = Archive::init(context.child("storage"), cfg.clone())
456                .await
457                .expect("Failed to initialize archive");
458
459            // Insert multiple keys across different sections
460            let keys = vec![
461                (1u64, test_key("key1-blah"), 1),
462                (2u64, test_key("key2-blah"), 2),
463                (3u64, test_key("key3-blah"), 3),
464                (4u64, test_key("key3-bleh"), 3),
465                (5u64, test_key("key4-blah"), 4),
466            ];
467
468            for (index, key, data) in &keys {
469                archive
470                    .put(*index, key.clone(), *data)
471                    .await
472                    .expect("Failed to put data");
473            }
474
475            // Check metrics
476            let buffer = context.encode();
477            assert!(has_metric_value(&buffer, "items_tracked", 5));
478
479            // Prune sections less than 3
480            archive.prune(3).await.expect("Failed to prune");
481
482            // Ensure keys 1 and 2 are no longer present
483            for (index, key, data) in keys {
484                let retrieved = archive
485                    .get(Identifier::Key(&key))
486                    .await
487                    .expect("Failed to get data");
488                if index < 3 {
489                    assert!(retrieved.is_none());
490                } else {
491                    assert_eq!(retrieved.expect("Data not found"), data);
492                }
493            }
494
495            // Check metrics
496            let buffer = context.encode();
497            assert!(has_metric_value(&buffer, "items_tracked", 3));
498            assert!(has_metric_value(&buffer, "indices_pruned_total", 2));
499            assert!(has_metric_value(&buffer, "pruned_total", 0)); // no lazy cleanup yet
500
501            // Try to prune older section
502            archive.prune(2).await.expect("Failed to prune");
503
504            // Try to prune current section again
505            archive.prune(3).await.expect("Failed to prune");
506
507            // Try to put older index
508            let result = archive.put(1, test_key("key1-blah"), 1).await;
509            assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
510
511            // Trigger lazy removal of keys
512            archive
513                .put(6, test_key("key2-blfh"), 5)
514                .await
515                .expect("Failed to put data");
516
517            // Check metrics
518            let buffer = context.encode();
519            assert!(has_metric_value(&buffer, "items_tracked", 4)); // lazily remove one, add one
520            assert!(has_metric_value(&buffer, "indices_pruned_total", 2));
521            assert!(has_metric_value(&buffer, "pruned_total", 1));
522        });
523    }
524
525    fn test_archive_keys_and_restart(num_keys: usize) -> String {
526        // Initialize the deterministic context
527        let executor = deterministic::Runner::default();
528        executor.start(|mut context| async move {
529            // Initialize the archive
530            let items_per_section = 256u64;
531            let cfg = Config {
532                translator: TwoCap,
533                key_partition: "test-index".into(),
534                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
535                value_partition: "test-value".into(),
536                codec_config: (),
537                compression: None,
538                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
539                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
540                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
541                items_per_section: NZU64!(items_per_section),
542            };
543            let mut archive = Archive::init(
544                context.child("init").with_attribute("index", 1),
545                cfg.clone(),
546            )
547            .await
548            .expect("Failed to initialize archive");
549
550            // Insert multiple keys across different sections
551            let mut keys = BTreeMap::new();
552            while keys.len() < num_keys {
553                let index = keys.len() as u64;
554                let mut key = [0u8; 64];
555                context.fill(&mut key);
556                let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
557                let mut data = [0u8; 1024];
558                context.fill(&mut data);
559                let data = FixedBytes::<1024>::decode(data.as_ref()).unwrap();
560
561                archive
562                    .put(index, key.clone(), data.clone())
563                    .await
564                    .expect("Failed to put data");
565                keys.insert(key, (index, data));
566            }
567
568            // Ensure all keys can be retrieved
569            for (key, (index, data)) in &keys {
570                let retrieved = archive
571                    .get(Identifier::Index(*index))
572                    .await
573                    .expect("Failed to get data")
574                    .expect("Data not found");
575                assert_eq!(&retrieved, data);
576                let retrieved = archive
577                    .get(Identifier::Key(key))
578                    .await
579                    .expect("Failed to get data")
580                    .expect("Data not found");
581                assert_eq!(&retrieved, data);
582            }
583
584            // Check metrics
585            let buffer = context.encode();
586            assert!(has_metric_value(&buffer, "items_tracked", num_keys));
587            assert!(has_metric_value(&buffer, "pruned_total", 0));
588
589            // Sync and drop the archive
590            archive.sync().await.expect("Failed to sync archive");
591            drop(archive);
592
593            // Reinitialize the archive
594            let cfg = Config {
595                translator: TwoCap,
596                key_partition: "test-index".into(),
597                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
598                value_partition: "test-value".into(),
599                codec_config: (),
600                compression: None,
601                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
602                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
603                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
604                items_per_section: NZU64!(items_per_section),
605            };
606            let mut archive = Archive::<_, _, _, FixedBytes<1024>>::init(
607                context.child("init").with_attribute("index", 2),
608                cfg.clone(),
609            )
610            .await
611            .expect("Failed to initialize archive");
612
613            // Ensure all keys can be retrieved
614            for (key, (index, data)) in &keys {
615                let retrieved = archive
616                    .get(Identifier::Index(*index))
617                    .await
618                    .expect("Failed to get data")
619                    .expect("Data not found");
620                assert_eq!(&retrieved, data);
621                let retrieved = archive
622                    .get(Identifier::Key(key))
623                    .await
624                    .expect("Failed to get data")
625                    .expect("Data not found");
626                assert_eq!(&retrieved, data);
627            }
628
629            // Prune first half
630            let min = (keys.len() / 2) as u64;
631            archive.prune(min).await.expect("Failed to prune");
632
633            // Ensure all keys can be retrieved that haven't been pruned
634            let min = (min / items_per_section) * items_per_section;
635            let mut removed = 0;
636            for (key, (index, data)) in keys {
637                if index >= min {
638                    let retrieved = archive
639                        .get(Identifier::Key(&key))
640                        .await
641                        .expect("Failed to get data")
642                        .expect("Data not found");
643                    assert_eq!(retrieved, data);
644
645                    // Check range
646                    let (current_end, start_next) = archive.next_gap(index);
647                    assert_eq!(current_end.unwrap(), num_keys as u64 - 1);
648                    assert!(start_next.is_none());
649                } else {
650                    let retrieved = archive
651                        .get(Identifier::Key(&key))
652                        .await
653                        .expect("Failed to get data");
654                    assert!(retrieved.is_none());
655                    removed += 1;
656
657                    // Check range
658                    let (current_end, start_next) = archive.next_gap(index);
659                    assert!(current_end.is_none());
660                    assert_eq!(start_next.unwrap(), min);
661                }
662            }
663
664            // Check metrics
665            let buffer = context.encode();
666            assert!(has_metric_value(
667                &buffer,
668                "items_tracked",
669                num_keys - removed
670            ));
671            assert!(has_metric_value(&buffer, "indices_pruned_total", removed));
672            assert!(has_metric_value(&buffer, "pruned_total", 0)); // have not lazily removed keys yet
673
674            context.auditor().state()
675        })
676    }
677
678    #[test_group("slow")]
679    #[test_traced]
680    fn test_archive_many_keys_and_restart() {
681        test_archive_keys_and_restart(100_000);
682    }
683
684    #[test_group("slow")]
685    #[test_traced]
686    fn test_determinism() {
687        let state1 = test_archive_keys_and_restart(5_000);
688        let state2 = test_archive_keys_and_restart(5_000);
689        assert_eq!(state1, state2);
690    }
691
692    /// Regression: when the same key is stored at multiple indices and the
693    /// earlier index is pruned, a subsequent `get`/`has` by key must resolve
694    /// to the surviving, non-pruned entry rather than report the pruned one.
695    /// Callers such as consensus's marshal cache rely on this to retain a
696    /// reproposal of the same block at a later index even after the
697    /// earlier index's retention window closes.
698    #[test_traced]
699    fn test_archive_key_lookup_skips_pruned_duplicates() {
700        let executor = deterministic::Runner::default();
701        executor.start(|context| async move {
702            let cfg = Config {
703                translator: FourCap,
704                key_partition: "test-index".into(),
705                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
706                value_partition: "test-value".into(),
707                codec_config: (),
708                compression: None,
709                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
710                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
711                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
712                items_per_section: NZU64!(1),
713            };
714            let mut archive = Archive::init(context.child("storage"), cfg)
715                .await
716                .expect("Failed to initialize archive");
717
718            // Same key stored at two different indices. Distinct values only
719            // to make it observable which entry wins; a real caller would
720            // store the same value (e.g. the same block) at both indices.
721            let key = test_key("dupe-key");
722            archive.put(2, key.clone(), 20).await.unwrap();
723            archive.put(5, key.clone(), 50).await.unwrap();
724
725            // Before pruning, either entry is a permitted answer per the
726            // trait contract. The implementation happens to return the
727            // earlier index, but we only assert a value is present.
728            assert!(archive.get(Identifier::Key(&key)).await.unwrap().is_some());
729            assert!(archive.has(Identifier::Key(&key)).await.unwrap());
730
731            // Prune the earlier index (section 2). The later index must be
732            // the sole surviving answer.
733            archive.prune(3).await.unwrap();
734            let got = archive.get(Identifier::Key(&key)).await.unwrap();
735            assert_eq!(
736                got,
737                Some(50),
738                "key lookup must skip the pruned entry and return the surviving one"
739            );
740            assert!(archive.has(Identifier::Key(&key)).await.unwrap());
741
742            // Prune past the later index too — now nothing survives.
743            archive.prune(6).await.unwrap();
744            assert_eq!(archive.get(Identifier::Key(&key)).await.unwrap(), None);
745            assert!(!archive.has(Identifier::Key(&key)).await.unwrap());
746        });
747    }
748
749    #[test_traced]
750    fn test_get_all_after_prune() {
751        let executor = deterministic::Runner::default();
752        executor.start(|context| async move {
753            let cfg = Config {
754                translator: FourCap,
755                key_partition: "test-index".into(),
756                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
757                value_partition: "test-value".into(),
758                codec_config: (),
759                compression: None,
760                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
761                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
762                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
763                items_per_section: NZU64!(1),
764            };
765            let mut archive = Archive::init(context.child("storage"), cfg)
766                .await
767                .expect("Failed to initialize archive");
768
769            archive.put_multi(1, test_key("aaa"), 10).await.unwrap();
770            archive.put_multi(1, test_key("bbb"), 20).await.unwrap();
771            archive.put_multi(3, test_key("ccc"), 30).await.unwrap();
772
773            // Prune below index 3
774            archive.prune(3).await.unwrap();
775
776            // Pruned index returns None
777            let all = archive.get_all(1).await.unwrap();
778            assert_eq!(all, None);
779
780            // Surviving index still works
781            let all = archive.get_all(3).await.unwrap();
782            assert_eq!(all, Some(vec![30]));
783        });
784    }
785
786    #[test_traced]
787    fn test_put_multi_prune() {
788        let executor = deterministic::Runner::default();
789        executor.start(|context| async move {
790            let cfg = Config {
791                translator: FourCap,
792                key_partition: "test-index".into(),
793                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
794                value_partition: "test-value".into(),
795                codec_config: (),
796                compression: None,
797                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
798                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
799                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
800                items_per_section: NZU64!(1),
801            };
802            let mut archive = Archive::init(context.child("storage"), cfg)
803                .await
804                .expect("Failed to initialize archive");
805
806            // Two items at index 1, one at index 3
807            archive.put_multi(1, test_key("aaa"), 10).await.unwrap();
808            archive.put_multi(1, test_key("bbb"), 20).await.unwrap();
809            archive.put_multi(3, test_key("ccc"), 30).await.unwrap();
810
811            let buffer = context.encode();
812            assert!(has_metric_value(&buffer, "items_tracked", 2));
813
814            // Prune below index 3
815            archive.prune(3).await.unwrap();
816
817            // Both items at index 1 are gone
818            assert_eq!(
819                archive
820                    .get(Identifier::Key(&test_key("aaa")))
821                    .await
822                    .unwrap(),
823                None
824            );
825            assert_eq!(
826                archive
827                    .get(Identifier::Key(&test_key("bbb")))
828                    .await
829                    .unwrap(),
830                None
831            );
832
833            // Item at index 3 survives
834            assert_eq!(
835                archive
836                    .get(Identifier::Key(&test_key("ccc")))
837                    .await
838                    .unwrap(),
839                Some(30)
840            );
841
842            let buffer = context.encode();
843            assert!(has_metric_value(&buffer, "items_tracked", 1));
844            assert!(has_metric_value(&buffer, "indices_pruned_total", 1));
845
846            // put_multi below pruned index is rejected
847            let result = archive.put_multi(2, test_key("ddd"), 40).await;
848            assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
849        });
850    }
851}