commonware_storage/archive/prunable/mod.rs
1//! A prunable key-value store for ordered data.
2//!
3//! Data is stored across two backends: [crate::journal::segmented::fixed] for fixed-size index entries and
4//! [crate::journal::segmented::glob::Glob] for values (managed by [crate::journal::segmented::oversized]).
5//! The location of written data is stored in-memory by both index and key (via [crate::index::unordered::Index])
6//! to enable efficient lookups (on average).
7//!
8//! _Notably, [Archive] does not make use of compaction nor on-disk indexes (and thus has no read
9//! nor write amplification during normal operation).
10//!
11//! # Format
12//!
13//! [Archive] uses a two-journal structure for efficient page cache usage:
14//!
15//! **Index Journal (segmented/fixed)** - Fixed-size entries for fast startup replay:
16//! ```text
17//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
18//! | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |13 |14 |15 |16 |17 |18 |19 |20 |21 |22 |23 |
19//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
20//! | Index(u64) |Key(Fixed Size)| val_offset(u64) | val_size(u32) |
21//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
22//! ```
23//!
24//! **Value Blob** - Raw values with CRC32 checksums (direct reads, no page cache):
25//! ```text
26//! +---+---+---+---+---+---+---+---+---+---+---+---+
27//! | Compressed Data (variable) | CRC32 |
28//! +---+---+---+---+---+---+---+---+---+---+---+---+
29//! ```
30//!
31//! # Uniqueness
32//!
33//! Indices are unique for [Archive] and writing to an occupied index is a no-op. Duplicate
34//! indices can be stored via [`crate::archive::MultiArchive::put_multi`].
35//!
36//! Keys may be stored at multiple indices with either put variant. A lookup by
37//! [`crate::archive::Identifier::Key`] may return any of the values at that key. Entries
38//! whose index has been pruned are never returned or reported as present, so a key matching
39//! both a pruned and a non-pruned entry resolves to the non-pruned entry.
40//!
41//! ## Conflicts
42//!
43//! Because a translated representation of a key is only ever stored in memory, it is possible (and
44//! expected) that two keys will eventually be represented by the same translated key. To handle
45//! this case, [Archive] must check the persisted form of all conflicting keys to ensure data from
46//! the correct key is returned. To support efficient checks, [Archive] (via
47//! [crate::index::unordered::Index]) keeps a linked list of all keys with the same translated
48//! prefix:
49//!
50//! ```rust
51//! struct Record {
52//! index: u64,
53//!
54//! next: Option<Box<Record>>,
55//! }
56//! ```
57//!
58//! _To avoid random memory reads in the common case, the in-memory index directly stores the first
59//! item in the linked list instead of a pointer to the first item._
60//!
61//! `index` is the key to the map used to serve lookups by `index` that stores the position in the
62//! index journal (selected by `section = index / items_per_section * items_per_section` to minimize
63//! the number of open blobs):
64//!
65//! ```text
66//! // Maps index -> position in index journal
67//! indices: BTreeMap<u64, u64>
68//! ```
69//!
70//! _If the [Translator] provided by the caller does not uniformly distribute keys across the key
71//! space or uses a translated representation that means keys on average have many conflicts,
72//! performance will degrade._
73//!
74//! ## Memory Overhead
75//!
76//! [Archive] uses two maps to enable lookups by both index and key. The memory used to track each
77//! index item is `8 + 8` (where `8` is the index and `8` is the position in the index journal).
78//! The memory used to track each key item is `~translated(key).len() + 16` bytes (where `16` is the
79//! size of the `Record` struct). This means that an [Archive] employing a [Translator] that uses
80//! the first `8` bytes of a key will use `~40` bytes to index each key.
81//!
82//! ### MultiArchive Overhead
83//!
84//! [Archive] stores index positions in a dual-map layout:
85//! - `indices: BTreeMap<u64, u64>` tracks the first position for each index.
86//! - `extra_indices: BTreeMap<u64, Vec<u64>>` tracks additional positions for indices written via
87//! [crate::archive::MultiArchive::put_multi].
88//!
89//! This means the baseline overhead above remains unchanged for the first item at an index. For
90//! indices with duplicates, the additional in-memory payload is:
91//! - one `Vec<u64>` header (`24` bytes), and
92//! - `n * 8` bytes for `n` additional positions.
93//!
94//! Equivalently, this is `24 + (n * 8)` bytes per duplicated index, excluding `BTreeMap` node
95//! overhead for `extra_indices`.
96//!
97//! # Pruning
98//!
99//! [Archive] supports pruning up to a minimum `index` using the `prune` method. After `prune` is
100//! called on a `section`, all interaction with a `section` less than the pruned `section` will
101//! return an error.
102//!
103//! ## Lazy Index Cleanup
104//!
105//! Instead of performing a full iteration of the in-memory index, storing an additional in-memory
106//! index per `section`, or replaying a `section` of the value blob,
107//! [Archive] lazily cleans up the [crate::index::unordered::Index] after pruning. When a new key is
108//! stored that overlaps (same translated value) with a pruned key, the pruned key is removed from
109//! the in-memory index.
110//!
111//! # Read Path
112//!
113//! All reads (by index or key) first read the index entry from the index journal to get the
114//! value location (offset and size), then read the value from the value blob. The index journal
115//! uses a page cache for caching, so hot entries are served from memory. Values are read directly
116//! from disk without caching to avoid polluting the page cache with large values.
117//!
118//! # Compression
119//!
120//! [Archive] supports compressing data before storing it on disk. This can be enabled by setting
121//! the `compression` field in the `Config` struct to a valid `zstd` compression level. This setting
122//! can be changed between initializations of [Archive], however, it must remain populated if any
123//! data was written with compression enabled.
124//!
125//! # Querying for Gaps
126//!
127//! [Archive] tracks gaps in the index space to enable the caller to efficiently fetch unknown keys
128//! using `next_gap`. This is a very common pattern when syncing blocks in a blockchain.
129//!
130//! # Example
131//!
132//! ```rust
133//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
134//! use commonware_cryptography::{Hasher as _, Sha256};
135//! use commonware_storage::{
136//! translator::FourCap,
137//! archive::{
138//! Archive as _,
139//! prunable::{Archive, Config},
140//! },
141//! };
142//! use commonware_utils::{NZUsize, NZU16, NZU64};
143//!
144//! let executor = deterministic::Runner::default();
145//! executor.start(|context| async move {
146//! // Create an archive
147//! let cfg = Config {
148//! translator: FourCap,
149//! key_partition: "demo-index".into(),
150//! key_page_cache: CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10)),
151//! value_partition: "demo-value".into(),
152//! compression: Some(3),
153//! codec_config: (),
154//! items_per_section: NZU64!(1024),
155//! key_write_buffer: NZUsize!(1024 * 1024),
156//! value_write_buffer: NZUsize!(1024 * 1024),
157//! replay_buffer: NZUsize!(4096),
158//! };
159//! let mut archive = Archive::init(context, cfg).await.unwrap();
160//!
161//! // Put a key
162//! archive.put(1, Sha256::hash(b"data"), 10).await.unwrap();
163//!
164//! // Sync the archive
165//! archive.sync().await.unwrap();
166//! });
167//! ```
168
169use crate::translator::Translator;
170use commonware_runtime::buffer::paged::CacheRef;
171use std::num::{NonZeroU64, NonZeroUsize};
172
173mod storage;
174pub use storage::Archive;
175
176/// Configuration for [Archive] storage.
177#[derive(Clone)]
178pub struct Config<T: Translator, C> {
179 /// Logic to transform keys into their index representation.
180 ///
181 /// [Archive] assumes that all internal keys are spread uniformly across the key space.
182 /// If that is not the case, lookups may be O(n) instead of O(1).
183 pub translator: T,
184
185 /// The partition to use for the key journal (stores index+key metadata).
186 pub key_partition: String,
187
188 /// The page cache to use for the key journal.
189 pub key_page_cache: CacheRef,
190
191 /// The partition to use for the value blob (stores values).
192 pub value_partition: String,
193
194 /// The compression level to use for the value blob.
195 pub compression: Option<u8>,
196
197 /// The [commonware_codec::Codec] configuration to use for the value stored in the archive.
198 pub codec_config: C,
199
200 /// The number of items per section (the granularity of pruning).
201 pub items_per_section: NonZeroU64,
202
203 /// The amount of bytes that can be buffered for the key journal before being written to a
204 /// [commonware_runtime::Blob].
205 pub key_write_buffer: NonZeroUsize,
206
207 /// The amount of bytes that can be buffered for the value journal before being written to a
208 /// [commonware_runtime::Blob].
209 pub value_write_buffer: NonZeroUsize,
210
211 /// The buffer size to use when replaying a [commonware_runtime::Blob].
212 pub replay_buffer: NonZeroUsize,
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218 use crate::{
219 archive::{Archive as _, Error, Identifier, MultiArchive as _},
220 journal::Error as JournalError,
221 translator::{FourCap, TwoCap},
222 };
223 use commonware_codec::{DecodeExt, Error as CodecError};
224 use commonware_macros::{test_group, test_traced};
225 use commonware_runtime::{
226 deterministic, telemetry::metrics::has_metric_value, Metrics as _, Runner, Supervisor as _,
227 };
228 use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
229 use rand::Rng;
230 use std::{collections::BTreeMap, num::NonZeroU16};
231
232 fn test_key(key: &str) -> FixedBytes<64> {
233 let mut buf = [0u8; 64];
234 let key = key.as_bytes();
235 assert!(key.len() <= buf.len());
236 buf[..key.len()].copy_from_slice(key);
237 FixedBytes::decode(buf.as_ref()).unwrap()
238 }
239
240 const DEFAULT_ITEMS_PER_SECTION: u64 = 65536;
241 const DEFAULT_WRITE_BUFFER: usize = 1024;
242 const DEFAULT_REPLAY_BUFFER: usize = 4096;
243 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
244 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
245
246 #[test_traced]
247 fn test_archive_compression_then_none() {
248 // Initialize the deterministic context
249 let executor = deterministic::Runner::default();
250 executor.start(|context| async move {
251 // Initialize the archive
252 let cfg = Config {
253 translator: FourCap,
254 key_partition: "test-index".into(),
255 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
256 value_partition: "test-value".into(),
257 codec_config: (),
258 compression: Some(3),
259 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
260 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
261 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
262 items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
263 };
264 let mut archive = Archive::init(context.child("first"), cfg.clone())
265 .await
266 .expect("Failed to initialize archive");
267
268 // Put the key-data pair
269 let index = 1u64;
270 let key = test_key("testkey");
271 let data = 1;
272 archive
273 .put(index, key.clone(), data)
274 .await
275 .expect("Failed to put data");
276
277 // Sync and drop the archive
278 archive.sync().await.expect("Failed to sync archive");
279 drop(archive);
280
281 // Initialize the archive again without compression.
282 // Index journal replay succeeds (no compression), but value reads will fail.
283 let cfg = Config {
284 translator: FourCap,
285 key_partition: "test-index".into(),
286 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
287 value_partition: "test-value".into(),
288 codec_config: (),
289 compression: None,
290 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
291 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
292 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
293 items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
294 };
295 let archive =
296 Archive::<_, _, FixedBytes<64>, i32>::init(context.child("second"), cfg.clone())
297 .await
298 .unwrap();
299
300 // Getting the value should fail because compression settings mismatch.
301 // Without compression, the codec sees extra bytes after decoding the value
302 // (because the compressed data doesn't match the expected format).
303 let result: Result<Option<i32>, _> = archive.get(Identifier::Index(index)).await;
304 assert!(matches!(
305 result,
306 Err(Error::Journal(JournalError::Codec(CodecError::ExtraData(
307 _
308 ))))
309 ));
310 });
311 }
312
313 #[test_traced]
314 fn test_archive_overlapping_key_basic() {
315 // Initialize the deterministic context
316 let executor = deterministic::Runner::default();
317 executor.start(|context| async move {
318 // Initialize the archive
319 let cfg = Config {
320 translator: FourCap,
321 key_partition: "test-index".into(),
322 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
323 value_partition: "test-value".into(),
324 codec_config: (),
325 compression: None,
326 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
327 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
328 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
329 items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
330 };
331 let mut archive = Archive::init(context.child("storage"), cfg.clone())
332 .await
333 .expect("Failed to initialize archive");
334
335 let index1 = 1u64;
336 let key1 = test_key("keys1");
337 let data1 = 1;
338 let index2 = 2u64;
339 let key2 = test_key("keys2");
340 let data2 = 2;
341
342 // Put the key-data pair
343 archive
344 .put(index1, key1.clone(), data1)
345 .await
346 .expect("Failed to put data");
347
348 // Put the key-data pair
349 archive
350 .put(index2, key2.clone(), data2)
351 .await
352 .expect("Failed to put data");
353
354 // Get the data back
355 let retrieved = archive
356 .get(Identifier::Key(&key1))
357 .await
358 .expect("Failed to get data")
359 .expect("Data not found");
360 assert_eq!(retrieved, data1);
361
362 // Get the data back
363 let retrieved = archive
364 .get(Identifier::Key(&key2))
365 .await
366 .expect("Failed to get data")
367 .expect("Data not found");
368 assert_eq!(retrieved, data2);
369
370 // Check metrics
371 let buffer = context.encode();
372 assert!(has_metric_value(&buffer, "items_tracked", 2));
373 assert!(buffer.contains("unnecessary_reads_total 1"));
374 assert!(buffer.contains("gets_total 2"));
375 });
376 }
377
378 #[test_traced]
379 fn test_archive_overlapping_key_multiple_sections() {
380 // Initialize the deterministic context
381 let executor = deterministic::Runner::default();
382 executor.start(|context| async move {
383 // Initialize the archive
384 let cfg = Config {
385 translator: FourCap,
386 key_partition: "test-index".into(),
387 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
388 value_partition: "test-value".into(),
389 codec_config: (),
390 compression: None,
391 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
392 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
393 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
394 items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
395 };
396 let mut archive = Archive::init(context.child("storage"), cfg.clone())
397 .await
398 .expect("Failed to initialize archive");
399
400 let index1 = 1u64;
401 let key1 = test_key("keys1");
402 let data1 = 1;
403 let index2 = 2_000_000u64;
404 let key2 = test_key("keys2");
405 let data2 = 2;
406
407 // Put the key-data pair
408 archive
409 .put(index1, key1.clone(), data1)
410 .await
411 .expect("Failed to put data");
412
413 // Put the key-data pair
414 archive
415 .put(index2, key2.clone(), data2)
416 .await
417 .expect("Failed to put data");
418
419 // Get the data back
420 let retrieved = archive
421 .get(Identifier::Key(&key1))
422 .await
423 .expect("Failed to get data")
424 .expect("Data not found");
425 assert_eq!(retrieved, data1);
426
427 // Get the data back
428 let retrieved = archive
429 .get(Identifier::Key(&key2))
430 .await
431 .expect("Failed to get data")
432 .expect("Data not found");
433 assert_eq!(retrieved, data2);
434 });
435 }
436
437 #[test_traced]
438 fn test_archive_prune_keys() {
439 // Initialize the deterministic context
440 let executor = deterministic::Runner::default();
441 executor.start(|context| async move {
442 // Initialize the archive
443 let cfg = Config {
444 translator: FourCap,
445 key_partition: "test-index".into(),
446 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
447 value_partition: "test-value".into(),
448 codec_config: (),
449 compression: None,
450 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
451 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
452 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
453 items_per_section: NZU64!(1), // no mask - each item is its own section
454 };
455 let mut archive = Archive::init(context.child("storage"), cfg.clone())
456 .await
457 .expect("Failed to initialize archive");
458
459 // Insert multiple keys across different sections
460 let keys = vec![
461 (1u64, test_key("key1-blah"), 1),
462 (2u64, test_key("key2-blah"), 2),
463 (3u64, test_key("key3-blah"), 3),
464 (4u64, test_key("key3-bleh"), 3),
465 (5u64, test_key("key4-blah"), 4),
466 ];
467
468 for (index, key, data) in &keys {
469 archive
470 .put(*index, key.clone(), *data)
471 .await
472 .expect("Failed to put data");
473 }
474
475 // Check metrics
476 let buffer = context.encode();
477 assert!(has_metric_value(&buffer, "items_tracked", 5));
478
479 // Prune sections less than 3
480 archive.prune(3).await.expect("Failed to prune");
481
482 // Ensure keys 1 and 2 are no longer present
483 for (index, key, data) in keys {
484 let retrieved = archive
485 .get(Identifier::Key(&key))
486 .await
487 .expect("Failed to get data");
488 if index < 3 {
489 assert!(retrieved.is_none());
490 } else {
491 assert_eq!(retrieved.expect("Data not found"), data);
492 }
493 }
494
495 // Check metrics
496 let buffer = context.encode();
497 assert!(has_metric_value(&buffer, "items_tracked", 3));
498 assert!(has_metric_value(&buffer, "indices_pruned_total", 2));
499 assert!(has_metric_value(&buffer, "pruned_total", 0)); // no lazy cleanup yet
500
501 // Try to prune older section
502 archive.prune(2).await.expect("Failed to prune");
503
504 // Try to prune current section again
505 archive.prune(3).await.expect("Failed to prune");
506
507 // Try to put older index
508 let result = archive.put(1, test_key("key1-blah"), 1).await;
509 assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
510
511 // Trigger lazy removal of keys
512 archive
513 .put(6, test_key("key2-blfh"), 5)
514 .await
515 .expect("Failed to put data");
516
517 // Check metrics
518 let buffer = context.encode();
519 assert!(has_metric_value(&buffer, "items_tracked", 4)); // lazily remove one, add one
520 assert!(has_metric_value(&buffer, "indices_pruned_total", 2));
521 assert!(has_metric_value(&buffer, "pruned_total", 1));
522 });
523 }
524
525 fn test_archive_keys_and_restart(num_keys: usize) -> String {
526 // Initialize the deterministic context
527 let executor = deterministic::Runner::default();
528 executor.start(|mut context| async move {
529 // Initialize the archive
530 let items_per_section = 256u64;
531 let cfg = Config {
532 translator: TwoCap,
533 key_partition: "test-index".into(),
534 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
535 value_partition: "test-value".into(),
536 codec_config: (),
537 compression: None,
538 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
539 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
540 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
541 items_per_section: NZU64!(items_per_section),
542 };
543 let mut archive = Archive::init(
544 context.child("init").with_attribute("index", 1),
545 cfg.clone(),
546 )
547 .await
548 .expect("Failed to initialize archive");
549
550 // Insert multiple keys across different sections
551 let mut keys = BTreeMap::new();
552 while keys.len() < num_keys {
553 let index = keys.len() as u64;
554 let mut key = [0u8; 64];
555 context.fill(&mut key);
556 let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
557 let mut data = [0u8; 1024];
558 context.fill(&mut data);
559 let data = FixedBytes::<1024>::decode(data.as_ref()).unwrap();
560
561 archive
562 .put(index, key.clone(), data.clone())
563 .await
564 .expect("Failed to put data");
565 keys.insert(key, (index, data));
566 }
567
568 // Ensure all keys can be retrieved
569 for (key, (index, data)) in &keys {
570 let retrieved = archive
571 .get(Identifier::Index(*index))
572 .await
573 .expect("Failed to get data")
574 .expect("Data not found");
575 assert_eq!(&retrieved, data);
576 let retrieved = archive
577 .get(Identifier::Key(key))
578 .await
579 .expect("Failed to get data")
580 .expect("Data not found");
581 assert_eq!(&retrieved, data);
582 }
583
584 // Check metrics
585 let buffer = context.encode();
586 assert!(has_metric_value(&buffer, "items_tracked", num_keys));
587 assert!(has_metric_value(&buffer, "pruned_total", 0));
588
589 // Sync and drop the archive
590 archive.sync().await.expect("Failed to sync archive");
591 drop(archive);
592
593 // Reinitialize the archive
594 let cfg = Config {
595 translator: TwoCap,
596 key_partition: "test-index".into(),
597 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
598 value_partition: "test-value".into(),
599 codec_config: (),
600 compression: None,
601 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
602 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
603 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
604 items_per_section: NZU64!(items_per_section),
605 };
606 let mut archive = Archive::<_, _, _, FixedBytes<1024>>::init(
607 context.child("init").with_attribute("index", 2),
608 cfg.clone(),
609 )
610 .await
611 .expect("Failed to initialize archive");
612
613 // Ensure all keys can be retrieved
614 for (key, (index, data)) in &keys {
615 let retrieved = archive
616 .get(Identifier::Index(*index))
617 .await
618 .expect("Failed to get data")
619 .expect("Data not found");
620 assert_eq!(&retrieved, data);
621 let retrieved = archive
622 .get(Identifier::Key(key))
623 .await
624 .expect("Failed to get data")
625 .expect("Data not found");
626 assert_eq!(&retrieved, data);
627 }
628
629 // Prune first half
630 let min = (keys.len() / 2) as u64;
631 archive.prune(min).await.expect("Failed to prune");
632
633 // Ensure all keys can be retrieved that haven't been pruned
634 let min = (min / items_per_section) * items_per_section;
635 let mut removed = 0;
636 for (key, (index, data)) in keys {
637 if index >= min {
638 let retrieved = archive
639 .get(Identifier::Key(&key))
640 .await
641 .expect("Failed to get data")
642 .expect("Data not found");
643 assert_eq!(retrieved, data);
644
645 // Check range
646 let (current_end, start_next) = archive.next_gap(index);
647 assert_eq!(current_end.unwrap(), num_keys as u64 - 1);
648 assert!(start_next.is_none());
649 } else {
650 let retrieved = archive
651 .get(Identifier::Key(&key))
652 .await
653 .expect("Failed to get data");
654 assert!(retrieved.is_none());
655 removed += 1;
656
657 // Check range
658 let (current_end, start_next) = archive.next_gap(index);
659 assert!(current_end.is_none());
660 assert_eq!(start_next.unwrap(), min);
661 }
662 }
663
664 // Check metrics
665 let buffer = context.encode();
666 assert!(has_metric_value(
667 &buffer,
668 "items_tracked",
669 num_keys - removed
670 ));
671 assert!(has_metric_value(&buffer, "indices_pruned_total", removed));
672 assert!(has_metric_value(&buffer, "pruned_total", 0)); // have not lazily removed keys yet
673
674 context.auditor().state()
675 })
676 }
677
678 #[test_group("slow")]
679 #[test_traced]
680 fn test_archive_many_keys_and_restart() {
681 test_archive_keys_and_restart(100_000);
682 }
683
684 #[test_group("slow")]
685 #[test_traced]
686 fn test_determinism() {
687 let state1 = test_archive_keys_and_restart(5_000);
688 let state2 = test_archive_keys_and_restart(5_000);
689 assert_eq!(state1, state2);
690 }
691
692 /// Regression: when the same key is stored at multiple indices and the
693 /// earlier index is pruned, a subsequent `get`/`has` by key must resolve
694 /// to the surviving, non-pruned entry rather than report the pruned one.
695 /// Callers such as consensus's marshal cache rely on this to retain a
696 /// reproposal of the same block at a later index even after the
697 /// earlier index's retention window closes.
698 #[test_traced]
699 fn test_archive_key_lookup_skips_pruned_duplicates() {
700 let executor = deterministic::Runner::default();
701 executor.start(|context| async move {
702 let cfg = Config {
703 translator: FourCap,
704 key_partition: "test-index".into(),
705 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
706 value_partition: "test-value".into(),
707 codec_config: (),
708 compression: None,
709 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
710 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
711 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
712 items_per_section: NZU64!(1),
713 };
714 let mut archive = Archive::init(context.child("storage"), cfg)
715 .await
716 .expect("Failed to initialize archive");
717
718 // Same key stored at two different indices. Distinct values only
719 // to make it observable which entry wins; a real caller would
720 // store the same value (e.g. the same block) at both indices.
721 let key = test_key("dupe-key");
722 archive.put(2, key.clone(), 20).await.unwrap();
723 archive.put(5, key.clone(), 50).await.unwrap();
724
725 // Before pruning, either entry is a permitted answer per the
726 // trait contract. The implementation happens to return the
727 // earlier index, but we only assert a value is present.
728 assert!(archive.get(Identifier::Key(&key)).await.unwrap().is_some());
729 assert!(archive.has(Identifier::Key(&key)).await.unwrap());
730
731 // Prune the earlier index (section 2). The later index must be
732 // the sole surviving answer.
733 archive.prune(3).await.unwrap();
734 let got = archive.get(Identifier::Key(&key)).await.unwrap();
735 assert_eq!(
736 got,
737 Some(50),
738 "key lookup must skip the pruned entry and return the surviving one"
739 );
740 assert!(archive.has(Identifier::Key(&key)).await.unwrap());
741
742 // Prune past the later index too — now nothing survives.
743 archive.prune(6).await.unwrap();
744 assert_eq!(archive.get(Identifier::Key(&key)).await.unwrap(), None);
745 assert!(!archive.has(Identifier::Key(&key)).await.unwrap());
746 });
747 }
748
749 #[test_traced]
750 fn test_get_all_after_prune() {
751 let executor = deterministic::Runner::default();
752 executor.start(|context| async move {
753 let cfg = Config {
754 translator: FourCap,
755 key_partition: "test-index".into(),
756 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
757 value_partition: "test-value".into(),
758 codec_config: (),
759 compression: None,
760 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
761 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
762 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
763 items_per_section: NZU64!(1),
764 };
765 let mut archive = Archive::init(context.child("storage"), cfg)
766 .await
767 .expect("Failed to initialize archive");
768
769 archive.put_multi(1, test_key("aaa"), 10).await.unwrap();
770 archive.put_multi(1, test_key("bbb"), 20).await.unwrap();
771 archive.put_multi(3, test_key("ccc"), 30).await.unwrap();
772
773 // Prune below index 3
774 archive.prune(3).await.unwrap();
775
776 // Pruned index returns None
777 let all = archive.get_all(1).await.unwrap();
778 assert_eq!(all, None);
779
780 // Surviving index still works
781 let all = archive.get_all(3).await.unwrap();
782 assert_eq!(all, Some(vec![30]));
783 });
784 }
785
786 #[test_traced]
787 fn test_put_multi_prune() {
788 let executor = deterministic::Runner::default();
789 executor.start(|context| async move {
790 let cfg = Config {
791 translator: FourCap,
792 key_partition: "test-index".into(),
793 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
794 value_partition: "test-value".into(),
795 codec_config: (),
796 compression: None,
797 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
798 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
799 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
800 items_per_section: NZU64!(1),
801 };
802 let mut archive = Archive::init(context.child("storage"), cfg)
803 .await
804 .expect("Failed to initialize archive");
805
806 // Two items at index 1, one at index 3
807 archive.put_multi(1, test_key("aaa"), 10).await.unwrap();
808 archive.put_multi(1, test_key("bbb"), 20).await.unwrap();
809 archive.put_multi(3, test_key("ccc"), 30).await.unwrap();
810
811 let buffer = context.encode();
812 assert!(has_metric_value(&buffer, "items_tracked", 2));
813
814 // Prune below index 3
815 archive.prune(3).await.unwrap();
816
817 // Both items at index 1 are gone
818 assert_eq!(
819 archive
820 .get(Identifier::Key(&test_key("aaa")))
821 .await
822 .unwrap(),
823 None
824 );
825 assert_eq!(
826 archive
827 .get(Identifier::Key(&test_key("bbb")))
828 .await
829 .unwrap(),
830 None
831 );
832
833 // Item at index 3 survives
834 assert_eq!(
835 archive
836 .get(Identifier::Key(&test_key("ccc")))
837 .await
838 .unwrap(),
839 Some(30)
840 );
841
842 let buffer = context.encode();
843 assert!(has_metric_value(&buffer, "items_tracked", 1));
844 assert!(has_metric_value(&buffer, "indices_pruned_total", 1));
845
846 // put_multi below pruned index is rejected
847 let result = archive.put_multi(2, test_key("ddd"), 40).await;
848 assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
849 });
850 }
851}