commonware_storage/archive/prunable/mod.rs
1//! A prunable key-value store for ordered data.
2//!
3//! Data is stored across two backends: [crate::journal::segmented::fixed] for fixed-size index entries and
4//! [crate::journal::segmented::glob::Glob] for values (managed by [crate::journal::segmented::oversized]).
5//! The location of written data is stored in-memory by both index and key (via [crate::index::unordered::Index])
6//! to enable efficient lookups (on average).
7//!
8//! _Notably, [Archive] does not make use of compaction nor on-disk indexes (and thus has no read
9//! nor write amplification during normal operation).
10//!
11//! # Format
12//!
13//! [Archive] uses a two-journal structure for efficient page cache usage:
14//!
15//! **Index Journal (segmented/fixed)** - Fixed-size entries for fast startup replay:
16//! ```text
17//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
18//! | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |13 |14 |15 |16 |17 |18 |19 |20 |21 |22 |23 |
19//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
20//! | Index(u64) |Key(Fixed Size)| val_offset(u64) | val_size(u32) |
21//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
22//! ```
23//!
24//! **Value Blob** - Raw values with CRC32 checksums (direct reads, no page cache):
25//! ```text
26//! +---+---+---+---+---+---+---+---+---+---+---+---+
27//! | Compressed Data (variable) | CRC32 |
28//! +---+---+---+---+---+---+---+---+---+---+---+---+
29//! ```
30//!
31//! # Uniqueness
32//!
33//! [Archive] assumes all stored indexes and keys are unique. If the same key is associated with
34//! multiple `indices`, there is no guarantee which value will be returned. If the key is written to
35//! an existing `index`, [Archive] will return an error.
36//!
37//! ## Conflicts
38//!
39//! Because a translated representation of a key is only ever stored in memory, it is possible (and
40//! expected) that two keys will eventually be represented by the same translated key. To handle
41//! this case, [Archive] must check the persisted form of all conflicting keys to ensure data from
42//! the correct key is returned. To support efficient checks, [Archive] (via
43//! [crate::index::unordered::Index]) keeps a linked list of all keys with the same translated
44//! prefix:
45//!
46//! ```rust
47//! struct Record {
48//! index: u64,
49//!
50//! next: Option<Box<Record>>,
51//! }
52//! ```
53//!
54//! _To avoid random memory reads in the common case, the in-memory index directly stores the first
55//! item in the linked list instead of a pointer to the first item._
56//!
57//! `index` is the key to the map used to serve lookups by `index` that stores the position in the
58//! index journal (selected by `section = index / items_per_section * items_per_section` to minimize
59//! the number of open blobs):
60//!
61//! ```text
62//! // Maps index -> position in index journal
63//! indices: BTreeMap<u64, u64>
64//! ```
65//!
66//! _If the [Translator] provided by the caller does not uniformly distribute keys across the key
67//! space or uses a translated representation that means keys on average have many conflicts,
68//! performance will degrade._
69//!
70//! ## Memory Overhead
71//!
72//! [Archive] uses two maps to enable lookups by both index and key. The memory used to track each
73//! index item is `8 + 8` (where `8` is the index and `8` is the position in the index journal).
74//! The memory used to track each key item is `~translated(key).len() + 16` bytes (where `16` is the
75//! size of the `Record` struct). This means that an [Archive] employing a [Translator] that uses
76//! the first `8` bytes of a key will use `~40` bytes to index each key.
77//!
78//! ### MultiArchive Overhead
79//!
80//! [Archive] stores index positions in a dual-map layout:
81//! - `indices: BTreeMap<u64, u64>` tracks the first position for each index.
82//! - `extra_indices: BTreeMap<u64, Vec<u64>>` tracks additional positions for indices written via
83//! [crate::archive::MultiArchive::put_multi].
84//!
85//! This means the baseline overhead above remains unchanged for the first item at an index. For
86//! indices with duplicates, the additional in-memory payload is:
87//! - one `Vec<u64>` header (`24` bytes), and
88//! - `n * 8` bytes for `n` additional positions.
89//!
90//! Equivalently, this is `24 + (n * 8)` bytes per duplicated index, excluding `BTreeMap` node
91//! overhead for `extra_indices`.
92//!
93//! # Pruning
94//!
95//! [Archive] supports pruning up to a minimum `index` using the `prune` method. After `prune` is
96//! called on a `section`, all interaction with a `section` less than the pruned `section` will
97//! return an error.
98//!
99//! ## Lazy Index Cleanup
100//!
101//! Instead of performing a full iteration of the in-memory index, storing an additional in-memory
102//! index per `section`, or replaying a `section` of the value blob,
103//! [Archive] lazily cleans up the [crate::index::unordered::Index] after pruning. When a new key is
104//! stored that overlaps (same translated value) with a pruned key, the pruned key is removed from
105//! the in-memory index.
106//!
107//! # Read Path
108//!
109//! All reads (by index or key) first read the index entry from the index journal to get the
110//! value location (offset and size), then read the value from the value blob. The index journal
111//! uses a page cache for caching, so hot entries are served from memory. Values are read directly
112//! from disk without caching to avoid polluting the page cache with large values.
113//!
114//! # Compression
115//!
116//! [Archive] supports compressing data before storing it on disk. This can be enabled by setting
117//! the `compression` field in the `Config` struct to a valid `zstd` compression level. This setting
118//! can be changed between initializations of [Archive], however, it must remain populated if any
119//! data was written with compression enabled.
120//!
121//! # Querying for Gaps
122//!
123//! [Archive] tracks gaps in the index space to enable the caller to efficiently fetch unknown keys
124//! using `next_gap`. This is a very common pattern when syncing blocks in a blockchain.
125//!
126//! # Example
127//!
128//! ```rust
129//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
130//! use commonware_cryptography::{Hasher as _, Sha256};
131//! use commonware_storage::{
132//! translator::FourCap,
133//! archive::{
134//! Archive as _,
135//! prunable::{Archive, Config},
136//! },
137//! };
138//! use commonware_utils::{NZUsize, NZU16, NZU64};
139//!
140//! let executor = deterministic::Runner::default();
141//! executor.start(|context| async move {
142//! // Create an archive
143//! let cfg = Config {
144//! translator: FourCap,
145//! key_partition: "demo-index".into(),
146//! key_page_cache: CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10)),
147//! value_partition: "demo-value".into(),
148//! compression: Some(3),
149//! codec_config: (),
150//! items_per_section: NZU64!(1024),
151//! key_write_buffer: NZUsize!(1024 * 1024),
152//! value_write_buffer: NZUsize!(1024 * 1024),
153//! replay_buffer: NZUsize!(4096),
154//! };
155//! let mut archive = Archive::init(context, cfg).await.unwrap();
156//!
157//! // Put a key
158//! archive.put(1, Sha256::hash(b"data"), 10).await.unwrap();
159//!
160//! // Sync the archive
161//! archive.sync().await.unwrap();
162//! });
163//! ```
164
165use crate::translator::Translator;
166use commonware_runtime::buffer::paged::CacheRef;
167use std::num::{NonZeroU64, NonZeroUsize};
168
169mod storage;
170pub use storage::Archive;
171
172/// Configuration for [Archive] storage.
173#[derive(Clone)]
174pub struct Config<T: Translator, C> {
175 /// Logic to transform keys into their index representation.
176 ///
177 /// [Archive] assumes that all internal keys are spread uniformly across the key space.
178 /// If that is not the case, lookups may be O(n) instead of O(1).
179 pub translator: T,
180
181 /// The partition to use for the key journal (stores index+key metadata).
182 pub key_partition: String,
183
184 /// The page cache to use for the key journal.
185 pub key_page_cache: CacheRef,
186
187 /// The partition to use for the value blob (stores values).
188 pub value_partition: String,
189
190 /// The compression level to use for the value blob.
191 pub compression: Option<u8>,
192
193 /// The [commonware_codec::Codec] configuration to use for the value stored in the archive.
194 pub codec_config: C,
195
196 /// The number of items per section (the granularity of pruning).
197 pub items_per_section: NonZeroU64,
198
199 /// The amount of bytes that can be buffered for the key journal before being written to a
200 /// [commonware_runtime::Blob].
201 pub key_write_buffer: NonZeroUsize,
202
203 /// The amount of bytes that can be buffered for the value journal before being written to a
204 /// [commonware_runtime::Blob].
205 pub value_write_buffer: NonZeroUsize,
206
207 /// The buffer size to use when replaying a [commonware_runtime::Blob].
208 pub replay_buffer: NonZeroUsize,
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use crate::{
215 archive::{Archive as _, Error, Identifier, MultiArchive as _},
216 journal::Error as JournalError,
217 translator::{FourCap, TwoCap},
218 };
219 use commonware_codec::{DecodeExt, Error as CodecError};
220 use commonware_macros::{test_group, test_traced};
221 use commonware_runtime::{deterministic, Metrics, Runner};
222 use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
223 use rand::Rng;
224 use std::{collections::BTreeMap, num::NonZeroU16};
225
226 fn test_key(key: &str) -> FixedBytes<64> {
227 let mut buf = [0u8; 64];
228 let key = key.as_bytes();
229 assert!(key.len() <= buf.len());
230 buf[..key.len()].copy_from_slice(key);
231 FixedBytes::decode(buf.as_ref()).unwrap()
232 }
233
234 const DEFAULT_ITEMS_PER_SECTION: u64 = 65536;
235 const DEFAULT_WRITE_BUFFER: usize = 1024;
236 const DEFAULT_REPLAY_BUFFER: usize = 4096;
237 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
238 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
239
240 #[test_traced]
241 fn test_archive_compression_then_none() {
242 // Initialize the deterministic context
243 let executor = deterministic::Runner::default();
244 executor.start(|context| async move {
245 // Initialize the archive
246 let cfg = Config {
247 translator: FourCap,
248 key_partition: "test-index".into(),
249 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
250 value_partition: "test-value".into(),
251 codec_config: (),
252 compression: Some(3),
253 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
254 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
255 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
256 items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
257 };
258 let mut archive = Archive::init(context.with_label("first"), cfg.clone())
259 .await
260 .expect("Failed to initialize archive");
261
262 // Put the key-data pair
263 let index = 1u64;
264 let key = test_key("testkey");
265 let data = 1;
266 archive
267 .put(index, key.clone(), data)
268 .await
269 .expect("Failed to put data");
270
271 // Sync and drop the archive
272 archive.sync().await.expect("Failed to sync archive");
273 drop(archive);
274
275 // Initialize the archive again without compression.
276 // Index journal replay succeeds (no compression), but value reads will fail.
277 let cfg = Config {
278 translator: FourCap,
279 key_partition: "test-index".into(),
280 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
281 value_partition: "test-value".into(),
282 codec_config: (),
283 compression: None,
284 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
285 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
286 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
287 items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
288 };
289 let archive = Archive::<_, _, FixedBytes<64>, i32>::init(
290 context.with_label("second"),
291 cfg.clone(),
292 )
293 .await
294 .unwrap();
295
296 // Getting the value should fail because compression settings mismatch.
297 // Without compression, the codec sees extra bytes after decoding the value
298 // (because the compressed data doesn't match the expected format).
299 let result: Result<Option<i32>, _> = archive.get(Identifier::Index(index)).await;
300 assert!(matches!(
301 result,
302 Err(Error::Journal(JournalError::Codec(CodecError::ExtraData(
303 _
304 ))))
305 ));
306 });
307 }
308
309 #[test_traced]
310 fn test_archive_overlapping_key_basic() {
311 // Initialize the deterministic context
312 let executor = deterministic::Runner::default();
313 executor.start(|context| async move {
314 // Initialize the archive
315 let cfg = Config {
316 translator: FourCap,
317 key_partition: "test-index".into(),
318 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
319 value_partition: "test-value".into(),
320 codec_config: (),
321 compression: None,
322 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
323 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
324 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
325 items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
326 };
327 let mut archive = Archive::init(context.clone(), cfg.clone())
328 .await
329 .expect("Failed to initialize archive");
330
331 let index1 = 1u64;
332 let key1 = test_key("keys1");
333 let data1 = 1;
334 let index2 = 2u64;
335 let key2 = test_key("keys2");
336 let data2 = 2;
337
338 // Put the key-data pair
339 archive
340 .put(index1, key1.clone(), data1)
341 .await
342 .expect("Failed to put data");
343
344 // Put the key-data pair
345 archive
346 .put(index2, key2.clone(), data2)
347 .await
348 .expect("Failed to put data");
349
350 // Get the data back
351 let retrieved = archive
352 .get(Identifier::Key(&key1))
353 .await
354 .expect("Failed to get data")
355 .expect("Data not found");
356 assert_eq!(retrieved, data1);
357
358 // Get the data back
359 let retrieved = archive
360 .get(Identifier::Key(&key2))
361 .await
362 .expect("Failed to get data")
363 .expect("Data not found");
364 assert_eq!(retrieved, data2);
365
366 // Check metrics
367 let buffer = context.encode();
368 assert!(buffer.contains("items_tracked 2"));
369 assert!(buffer.contains("unnecessary_reads_total 1"));
370 assert!(buffer.contains("gets_total 2"));
371 });
372 }
373
374 #[test_traced]
375 fn test_archive_overlapping_key_multiple_sections() {
376 // Initialize the deterministic context
377 let executor = deterministic::Runner::default();
378 executor.start(|context| async move {
379 // Initialize the archive
380 let cfg = Config {
381 translator: FourCap,
382 key_partition: "test-index".into(),
383 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
384 value_partition: "test-value".into(),
385 codec_config: (),
386 compression: None,
387 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
388 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
389 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
390 items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
391 };
392 let mut archive = Archive::init(context.clone(), cfg.clone())
393 .await
394 .expect("Failed to initialize archive");
395
396 let index1 = 1u64;
397 let key1 = test_key("keys1");
398 let data1 = 1;
399 let index2 = 2_000_000u64;
400 let key2 = test_key("keys2");
401 let data2 = 2;
402
403 // Put the key-data pair
404 archive
405 .put(index1, key1.clone(), data1)
406 .await
407 .expect("Failed to put data");
408
409 // Put the key-data pair
410 archive
411 .put(index2, key2.clone(), data2)
412 .await
413 .expect("Failed to put data");
414
415 // Get the data back
416 let retrieved = archive
417 .get(Identifier::Key(&key1))
418 .await
419 .expect("Failed to get data")
420 .expect("Data not found");
421 assert_eq!(retrieved, data1);
422
423 // Get the data back
424 let retrieved = archive
425 .get(Identifier::Key(&key2))
426 .await
427 .expect("Failed to get data")
428 .expect("Data not found");
429 assert_eq!(retrieved, data2);
430 });
431 }
432
433 #[test_traced]
434 fn test_archive_prune_keys() {
435 // Initialize the deterministic context
436 let executor = deterministic::Runner::default();
437 executor.start(|context| async move {
438 // Initialize the archive
439 let cfg = Config {
440 translator: FourCap,
441 key_partition: "test-index".into(),
442 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
443 value_partition: "test-value".into(),
444 codec_config: (),
445 compression: None,
446 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
447 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
448 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
449 items_per_section: NZU64!(1), // no mask - each item is its own section
450 };
451 let mut archive = Archive::init(context.clone(), cfg.clone())
452 .await
453 .expect("Failed to initialize archive");
454
455 // Insert multiple keys across different sections
456 let keys = vec![
457 (1u64, test_key("key1-blah"), 1),
458 (2u64, test_key("key2-blah"), 2),
459 (3u64, test_key("key3-blah"), 3),
460 (4u64, test_key("key3-bleh"), 3),
461 (5u64, test_key("key4-blah"), 4),
462 ];
463
464 for (index, key, data) in &keys {
465 archive
466 .put(*index, key.clone(), *data)
467 .await
468 .expect("Failed to put data");
469 }
470
471 // Check metrics
472 let buffer = context.encode();
473 assert!(buffer.contains("items_tracked 5"));
474
475 // Prune sections less than 3
476 archive.prune(3).await.expect("Failed to prune");
477
478 // Ensure keys 1 and 2 are no longer present
479 for (index, key, data) in keys {
480 let retrieved = archive
481 .get(Identifier::Key(&key))
482 .await
483 .expect("Failed to get data");
484 if index < 3 {
485 assert!(retrieved.is_none());
486 } else {
487 assert_eq!(retrieved.expect("Data not found"), data);
488 }
489 }
490
491 // Check metrics
492 let buffer = context.encode();
493 assert!(buffer.contains("items_tracked 3"));
494 assert!(buffer.contains("indices_pruned_total 2"));
495 assert!(buffer.contains("pruned_total 0")); // no lazy cleanup yet
496
497 // Try to prune older section
498 archive.prune(2).await.expect("Failed to prune");
499
500 // Try to prune current section again
501 archive.prune(3).await.expect("Failed to prune");
502
503 // Try to put older index
504 let result = archive.put(1, test_key("key1-blah"), 1).await;
505 assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
506
507 // Trigger lazy removal of keys
508 archive
509 .put(6, test_key("key2-blfh"), 5)
510 .await
511 .expect("Failed to put data");
512
513 // Check metrics
514 let buffer = context.encode();
515 assert!(buffer.contains("items_tracked 4")); // lazily remove one, add one
516 assert!(buffer.contains("indices_pruned_total 2"));
517 assert!(buffer.contains("pruned_total 1"));
518 });
519 }
520
521 fn test_archive_keys_and_restart(num_keys: usize) -> String {
522 // Initialize the deterministic context
523 let executor = deterministic::Runner::default();
524 executor.start(|mut context| async move {
525 // Initialize the archive
526 let items_per_section = 256u64;
527 let cfg = Config {
528 translator: TwoCap,
529 key_partition: "test-index".into(),
530 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
531 value_partition: "test-value".into(),
532 codec_config: (),
533 compression: None,
534 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
535 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
536 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
537 items_per_section: NZU64!(items_per_section),
538 };
539 let mut archive = Archive::init(context.with_label("init1"), cfg.clone())
540 .await
541 .expect("Failed to initialize archive");
542
543 // Insert multiple keys across different sections
544 let mut keys = BTreeMap::new();
545 while keys.len() < num_keys {
546 let index = keys.len() as u64;
547 let mut key = [0u8; 64];
548 context.fill(&mut key);
549 let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
550 let mut data = [0u8; 1024];
551 context.fill(&mut data);
552 let data = FixedBytes::<1024>::decode(data.as_ref()).unwrap();
553
554 archive
555 .put(index, key.clone(), data.clone())
556 .await
557 .expect("Failed to put data");
558 keys.insert(key, (index, data));
559 }
560
561 // Ensure all keys can be retrieved
562 for (key, (index, data)) in &keys {
563 let retrieved = archive
564 .get(Identifier::Index(*index))
565 .await
566 .expect("Failed to get data")
567 .expect("Data not found");
568 assert_eq!(&retrieved, data);
569 let retrieved = archive
570 .get(Identifier::Key(key))
571 .await
572 .expect("Failed to get data")
573 .expect("Data not found");
574 assert_eq!(&retrieved, data);
575 }
576
577 // Check metrics
578 let buffer = context.encode();
579 let tracked = format!("items_tracked {num_keys:?}");
580 assert!(buffer.contains(&tracked));
581 assert!(buffer.contains("pruned_total 0"));
582
583 // Sync and drop the archive
584 archive.sync().await.expect("Failed to sync archive");
585 drop(archive);
586
587 // Reinitialize the archive
588 let cfg = Config {
589 translator: TwoCap,
590 key_partition: "test-index".into(),
591 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
592 value_partition: "test-value".into(),
593 codec_config: (),
594 compression: None,
595 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
596 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
597 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
598 items_per_section: NZU64!(items_per_section),
599 };
600 let mut archive = Archive::<_, _, _, FixedBytes<1024>>::init(
601 context.with_label("init2"),
602 cfg.clone(),
603 )
604 .await
605 .expect("Failed to initialize archive");
606
607 // Ensure all keys can be retrieved
608 for (key, (index, data)) in &keys {
609 let retrieved = archive
610 .get(Identifier::Index(*index))
611 .await
612 .expect("Failed to get data")
613 .expect("Data not found");
614 assert_eq!(&retrieved, data);
615 let retrieved = archive
616 .get(Identifier::Key(key))
617 .await
618 .expect("Failed to get data")
619 .expect("Data not found");
620 assert_eq!(&retrieved, data);
621 }
622
623 // Prune first half
624 let min = (keys.len() / 2) as u64;
625 archive.prune(min).await.expect("Failed to prune");
626
627 // Ensure all keys can be retrieved that haven't been pruned
628 let min = (min / items_per_section) * items_per_section;
629 let mut removed = 0;
630 for (key, (index, data)) in keys {
631 if index >= min {
632 let retrieved = archive
633 .get(Identifier::Key(&key))
634 .await
635 .expect("Failed to get data")
636 .expect("Data not found");
637 assert_eq!(retrieved, data);
638
639 // Check range
640 let (current_end, start_next) = archive.next_gap(index);
641 assert_eq!(current_end.unwrap(), num_keys as u64 - 1);
642 assert!(start_next.is_none());
643 } else {
644 let retrieved = archive
645 .get(Identifier::Key(&key))
646 .await
647 .expect("Failed to get data");
648 assert!(retrieved.is_none());
649 removed += 1;
650
651 // Check range
652 let (current_end, start_next) = archive.next_gap(index);
653 assert!(current_end.is_none());
654 assert_eq!(start_next.unwrap(), min);
655 }
656 }
657
658 // Check metrics
659 let buffer = context.encode();
660 let tracked = format!("items_tracked {:?}", num_keys - removed);
661 assert!(buffer.contains(&tracked));
662 let pruned = format!("indices_pruned_total {removed}");
663 assert!(buffer.contains(&pruned));
664 assert!(buffer.contains("pruned_total 0")); // have not lazily removed keys yet
665
666 context.auditor().state()
667 })
668 }
669
670 #[test_group("slow")]
671 #[test_traced]
672 fn test_archive_many_keys_and_restart() {
673 test_archive_keys_and_restart(100_000);
674 }
675
676 #[test_group("slow")]
677 #[test_traced]
678 fn test_determinism() {
679 let state1 = test_archive_keys_and_restart(5_000);
680 let state2 = test_archive_keys_and_restart(5_000);
681 assert_eq!(state1, state2);
682 }
683
684 #[test_traced]
685 fn test_get_all_after_prune() {
686 let executor = deterministic::Runner::default();
687 executor.start(|context| async move {
688 let cfg = Config {
689 translator: FourCap,
690 key_partition: "test-index".into(),
691 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
692 value_partition: "test-value".into(),
693 codec_config: (),
694 compression: None,
695 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
696 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
697 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
698 items_per_section: NZU64!(1),
699 };
700 let mut archive = Archive::init(context.clone(), cfg)
701 .await
702 .expect("Failed to initialize archive");
703
704 archive.put_multi(1, test_key("aaa"), 10).await.unwrap();
705 archive.put_multi(1, test_key("bbb"), 20).await.unwrap();
706 archive.put_multi(3, test_key("ccc"), 30).await.unwrap();
707
708 // Prune below index 3
709 archive.prune(3).await.unwrap();
710
711 // Pruned index returns None
712 let all = archive.get_all(1).await.unwrap();
713 assert_eq!(all, None);
714
715 // Surviving index still works
716 let all = archive.get_all(3).await.unwrap();
717 assert_eq!(all, Some(vec![30]));
718 });
719 }
720
721 #[test_traced]
722 fn test_put_multi_prune() {
723 let executor = deterministic::Runner::default();
724 executor.start(|context| async move {
725 let cfg = Config {
726 translator: FourCap,
727 key_partition: "test-index".into(),
728 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
729 value_partition: "test-value".into(),
730 codec_config: (),
731 compression: None,
732 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
733 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
734 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
735 items_per_section: NZU64!(1),
736 };
737 let mut archive = Archive::init(context.clone(), cfg)
738 .await
739 .expect("Failed to initialize archive");
740
741 // Two items at index 1, one at index 3
742 archive.put_multi(1, test_key("aaa"), 10).await.unwrap();
743 archive.put_multi(1, test_key("bbb"), 20).await.unwrap();
744 archive.put_multi(3, test_key("ccc"), 30).await.unwrap();
745
746 let buffer = context.encode();
747 assert!(buffer.contains("items_tracked 2"));
748
749 // Prune below index 3
750 archive.prune(3).await.unwrap();
751
752 // Both items at index 1 are gone
753 assert_eq!(
754 archive
755 .get(Identifier::Key(&test_key("aaa")))
756 .await
757 .unwrap(),
758 None
759 );
760 assert_eq!(
761 archive
762 .get(Identifier::Key(&test_key("bbb")))
763 .await
764 .unwrap(),
765 None
766 );
767
768 // Item at index 3 survives
769 assert_eq!(
770 archive
771 .get(Identifier::Key(&test_key("ccc")))
772 .await
773 .unwrap(),
774 Some(30)
775 );
776
777 let buffer = context.encode();
778 assert!(buffer.contains("items_tracked 1"));
779 assert!(buffer.contains("indices_pruned_total 1"));
780
781 // put_multi below pruned index is rejected
782 let result = archive.put_multi(2, test_key("ddd"), 40).await;
783 assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
784 });
785 }
786}