commonware_storage/archive/prunable/mod.rs
1//! A prunable key-value store for ordered data.
2//!
3//! Data is stored across two backends: [crate::journal::segmented::fixed] for fixed-size index entries and
4//! [crate::journal::segmented::glob::Glob] for values (managed by [crate::journal::segmented::oversized]).
5//! The location of written data is stored in-memory by both index and key (via [crate::index::unordered::Index])
6//! to enable efficient lookups (on average).
7//!
8//! _Notably, [Archive] does not make use of compaction nor on-disk indexes (and thus has no read
9//! nor write amplification during normal operation).
10//!
11//! # Format
12//!
13//! [Archive] uses a two-journal structure for efficient page cache usage:
14//!
15//! **Index Journal (segmented/fixed)** - Fixed-size entries for fast startup replay:
16//! ```text
17//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
18//! | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |13 |14 |15 |16 |17 |18 |19 |20 |21 |22 |23 |
19//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
20//! | Index(u64) |Key(Fixed Size)| val_offset(u64) | val_size(u32) |
21//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
22//! ```
23//!
24//! **Value Blob** - Raw values with CRC32 checksums (direct reads, no page cache):
25//! ```text
26//! +---+---+---+---+---+---+---+---+---+---+---+---+
27//! | Compressed Data (variable) | CRC32 |
28//! +---+---+---+---+---+---+---+---+---+---+---+---+
29//! ```
30//!
31//! # Uniqueness
32//!
33//! [Archive] assumes all stored indexes and keys are unique. If the same key is associated with
34//! multiple `indices`, there is no guarantee which value will be returned. If the key is written to
35//! an existing `index`, [Archive] will return an error.
36//!
37//! ## Conflicts
38//!
39//! Because a translated representation of a key is only ever stored in memory, it is possible (and
40//! expected) that two keys will eventually be represented by the same translated key. To handle
41//! this case, [Archive] must check the persisted form of all conflicting keys to ensure data from
42//! the correct key is returned. To support efficient checks, [Archive] (via
43//! [crate::index::unordered::Index]) keeps a linked list of all keys with the same translated
44//! prefix:
45//!
46//! ```rust
47//! struct Record {
48//! index: u64,
49//!
50//! next: Option<Box<Record>>,
51//! }
52//! ```
53//!
54//! _To avoid random memory reads in the common case, the in-memory index directly stores the first
55//! item in the linked list instead of a pointer to the first item._
56//!
57//! `index` is the key to the map used to serve lookups by `index` that stores the position in the
58//! index journal (selected by `section = index / items_per_section * items_per_section` to minimize
59//! the number of open blobs):
60//!
61//! ```text
62//! // Maps index -> position in index journal
63//! indices: BTreeMap<u64, u64>
64//! ```
65//!
66//! _If the [Translator] provided by the caller does not uniformly distribute keys across the key
67//! space or uses a translated representation that means keys on average have many conflicts,
68//! performance will degrade._
69//!
70//! ## Memory Overhead
71//!
72//! [Archive] uses two maps to enable lookups by both index and key. The memory used to track each
73//! index item is `8 + 8` (where `8` is the index and `8` is the position in the index journal).
74//! The memory used to track each key item is `~translated(key).len() + 16` bytes (where `16` is the
75//! size of the `Record` struct). This means that an [Archive] employing a [Translator] that uses
76//! the first `8` bytes of a key will use `~40` bytes to index each key.
77//!
78//! ### MultiArchive Overhead
79//!
80//! [Archive] stores index positions in a dual-map layout:
81//! - `indices: BTreeMap<u64, u64>` tracks the first position for each index.
82//! - `extra_indices: BTreeMap<u64, Vec<u64>>` tracks additional positions for indices written via
83//! [crate::archive::MultiArchive::put_multi].
84//!
85//! This means the baseline overhead above remains unchanged for the first item at an index. For
86//! indices with duplicates, the additional in-memory payload is:
87//! - one `Vec<u64>` header (`24` bytes), and
88//! - `n * 8` bytes for `n` additional positions.
89//!
90//! Equivalently, this is `24 + (n * 8)` bytes per duplicated index, excluding `BTreeMap` node
91//! overhead for `extra_indices`.
92//!
93//! # Pruning
94//!
95//! [Archive] supports pruning up to a minimum `index` using the `prune` method. After `prune` is
96//! called on a `section`, all interaction with a `section` less than the pruned `section` will
97//! return an error.
98//!
99//! ## Lazy Index Cleanup
100//!
101//! Instead of performing a full iteration of the in-memory index, storing an additional in-memory
102//! index per `section`, or replaying a `section` of the value blob,
103//! [Archive] lazily cleans up the [crate::index::unordered::Index] after pruning. When a new key is
104//! stored that overlaps (same translated value) with a pruned key, the pruned key is removed from
105//! the in-memory index.
106//!
107//! # Read Path
108//!
109//! All reads (by index or key) first read the index entry from the index journal to get the
110//! value location (offset and size), then read the value from the value blob. The index journal
111//! uses a page cache for caching, so hot entries are served from memory. Values are read directly
112//! from disk without caching to avoid polluting the page cache with large values.
113//!
114//! # Compression
115//!
116//! [Archive] supports compressing data before storing it on disk. This can be enabled by setting
117//! the `compression` field in the `Config` struct to a valid `zstd` compression level. This setting
118//! can be changed between initializations of [Archive], however, it must remain populated if any
119//! data was written with compression enabled.
120//!
121//! # Querying for Gaps
122//!
123//! [Archive] tracks gaps in the index space to enable the caller to efficiently fetch unknown keys
124//! using `next_gap`. This is a very common pattern when syncing blocks in a blockchain.
125//!
126//! # Example
127//!
128//! ```rust
129//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
130//! use commonware_cryptography::{Hasher as _, Sha256};
131//! use commonware_storage::{
132//! translator::FourCap,
133//! archive::{
134//! Archive as _,
135//! prunable::{Archive, Config},
136//! },
137//! };
138//! use commonware_utils::{NZUsize, NZU16, NZU64};
139//!
140//! let executor = deterministic::Runner::default();
141//! executor.start(|context| async move {
142//! // Create an archive
143//! let cfg = Config {
144//! translator: FourCap,
145//! key_partition: "demo-index".into(),
146//! key_page_cache: CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10)),
147//! value_partition: "demo-value".into(),
148//! compression: Some(3),
149//! codec_config: (),
150//! items_per_section: NZU64!(1024),
151//! key_write_buffer: NZUsize!(1024 * 1024),
152//! value_write_buffer: NZUsize!(1024 * 1024),
153//! replay_buffer: NZUsize!(4096),
154//! };
155//! let mut archive = Archive::init(context, cfg).await.unwrap();
156//!
157//! // Put a key
158//! archive.put(1, Sha256::hash(b"data"), 10).await.unwrap();
159//!
160//! // Sync the archive
161//! archive.sync().await.unwrap();
162//! });
163//! ```
164
165use crate::translator::Translator;
166use commonware_runtime::buffer::paged::CacheRef;
167use std::num::{NonZeroU64, NonZeroUsize};
168
169mod storage;
170pub use storage::Archive;
171
172/// Configuration for [Archive] storage.
173#[derive(Clone)]
174pub struct Config<T: Translator, C> {
175 /// Logic to transform keys into their index representation.
176 ///
177 /// [Archive] assumes that all internal keys are spread uniformly across the key space.
178 /// If that is not the case, lookups may be O(n) instead of O(1).
179 pub translator: T,
180
181 /// The partition to use for the key journal (stores index+key metadata).
182 pub key_partition: String,
183
184 /// The page cache to use for the key journal.
185 pub key_page_cache: CacheRef,
186
187 /// The partition to use for the value blob (stores values).
188 pub value_partition: String,
189
190 /// The compression level to use for the value blob.
191 pub compression: Option<u8>,
192
193 /// The [commonware_codec::Codec] configuration to use for the value stored in the archive.
194 pub codec_config: C,
195
196 /// The number of items per section (the granularity of pruning).
197 pub items_per_section: NonZeroU64,
198
199 /// The amount of bytes that can be buffered for the key journal before being written to a
200 /// [commonware_runtime::Blob].
201 pub key_write_buffer: NonZeroUsize,
202
203 /// The amount of bytes that can be buffered for the value journal before being written to a
204 /// [commonware_runtime::Blob].
205 pub value_write_buffer: NonZeroUsize,
206
207 /// The buffer size to use when replaying a [commonware_runtime::Blob].
208 pub replay_buffer: NonZeroUsize,
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use crate::{
215 archive::{Archive as _, Error, Identifier, MultiArchive as _},
216 journal::Error as JournalError,
217 kv::tests::test_key,
218 translator::{FourCap, TwoCap},
219 };
220 use commonware_codec::{DecodeExt, Error as CodecError};
221 use commonware_macros::{test_group, test_traced};
222 use commonware_runtime::{deterministic, Metrics, Runner};
223 use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
224 use rand::Rng;
225 use std::{collections::BTreeMap, num::NonZeroU16};
226
227 const DEFAULT_ITEMS_PER_SECTION: u64 = 65536;
228 const DEFAULT_WRITE_BUFFER: usize = 1024;
229 const DEFAULT_REPLAY_BUFFER: usize = 4096;
230 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
231 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
232
233 #[test_traced]
234 fn test_archive_compression_then_none() {
235 // Initialize the deterministic context
236 let executor = deterministic::Runner::default();
237 executor.start(|context| async move {
238 // Initialize the archive
239 let cfg = Config {
240 translator: FourCap,
241 key_partition: "test-index".into(),
242 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
243 value_partition: "test-value".into(),
244 codec_config: (),
245 compression: Some(3),
246 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
247 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
248 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
249 items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
250 };
251 let mut archive = Archive::init(context.with_label("first"), cfg.clone())
252 .await
253 .expect("Failed to initialize archive");
254
255 // Put the key-data pair
256 let index = 1u64;
257 let key = test_key("testkey");
258 let data = 1;
259 archive
260 .put(index, key.clone(), data)
261 .await
262 .expect("Failed to put data");
263
264 // Sync and drop the archive
265 archive.sync().await.expect("Failed to sync archive");
266 drop(archive);
267
268 // Initialize the archive again without compression.
269 // Index journal replay succeeds (no compression), but value reads will fail.
270 let cfg = Config {
271 translator: FourCap,
272 key_partition: "test-index".into(),
273 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
274 value_partition: "test-value".into(),
275 codec_config: (),
276 compression: None,
277 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
278 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
279 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
280 items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
281 };
282 let archive = Archive::<_, _, FixedBytes<64>, i32>::init(
283 context.with_label("second"),
284 cfg.clone(),
285 )
286 .await
287 .unwrap();
288
289 // Getting the value should fail because compression settings mismatch.
290 // Without compression, the codec sees extra bytes after decoding the value
291 // (because the compressed data doesn't match the expected format).
292 let result: Result<Option<i32>, _> = archive.get(Identifier::Index(index)).await;
293 assert!(matches!(
294 result,
295 Err(Error::Journal(JournalError::Codec(CodecError::ExtraData(
296 _
297 ))))
298 ));
299 });
300 }
301
302 #[test_traced]
303 fn test_archive_overlapping_key_basic() {
304 // Initialize the deterministic context
305 let executor = deterministic::Runner::default();
306 executor.start(|context| async move {
307 // Initialize the archive
308 let cfg = Config {
309 translator: FourCap,
310 key_partition: "test-index".into(),
311 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
312 value_partition: "test-value".into(),
313 codec_config: (),
314 compression: None,
315 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
316 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
317 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
318 items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
319 };
320 let mut archive = Archive::init(context.clone(), cfg.clone())
321 .await
322 .expect("Failed to initialize archive");
323
324 let index1 = 1u64;
325 let key1 = test_key("keys1");
326 let data1 = 1;
327 let index2 = 2u64;
328 let key2 = test_key("keys2");
329 let data2 = 2;
330
331 // Put the key-data pair
332 archive
333 .put(index1, key1.clone(), data1)
334 .await
335 .expect("Failed to put data");
336
337 // Put the key-data pair
338 archive
339 .put(index2, key2.clone(), data2)
340 .await
341 .expect("Failed to put data");
342
343 // Get the data back
344 let retrieved = archive
345 .get(Identifier::Key(&key1))
346 .await
347 .expect("Failed to get data")
348 .expect("Data not found");
349 assert_eq!(retrieved, data1);
350
351 // Get the data back
352 let retrieved = archive
353 .get(Identifier::Key(&key2))
354 .await
355 .expect("Failed to get data")
356 .expect("Data not found");
357 assert_eq!(retrieved, data2);
358
359 // Check metrics
360 let buffer = context.encode();
361 assert!(buffer.contains("items_tracked 2"));
362 assert!(buffer.contains("unnecessary_reads_total 1"));
363 assert!(buffer.contains("gets_total 2"));
364 });
365 }
366
367 #[test_traced]
368 fn test_archive_overlapping_key_multiple_sections() {
369 // Initialize the deterministic context
370 let executor = deterministic::Runner::default();
371 executor.start(|context| async move {
372 // Initialize the archive
373 let cfg = Config {
374 translator: FourCap,
375 key_partition: "test-index".into(),
376 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
377 value_partition: "test-value".into(),
378 codec_config: (),
379 compression: None,
380 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
381 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
382 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
383 items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
384 };
385 let mut archive = Archive::init(context.clone(), cfg.clone())
386 .await
387 .expect("Failed to initialize archive");
388
389 let index1 = 1u64;
390 let key1 = test_key("keys1");
391 let data1 = 1;
392 let index2 = 2_000_000u64;
393 let key2 = test_key("keys2");
394 let data2 = 2;
395
396 // Put the key-data pair
397 archive
398 .put(index1, key1.clone(), data1)
399 .await
400 .expect("Failed to put data");
401
402 // Put the key-data pair
403 archive
404 .put(index2, key2.clone(), data2)
405 .await
406 .expect("Failed to put data");
407
408 // Get the data back
409 let retrieved = archive
410 .get(Identifier::Key(&key1))
411 .await
412 .expect("Failed to get data")
413 .expect("Data not found");
414 assert_eq!(retrieved, data1);
415
416 // Get the data back
417 let retrieved = archive
418 .get(Identifier::Key(&key2))
419 .await
420 .expect("Failed to get data")
421 .expect("Data not found");
422 assert_eq!(retrieved, data2);
423 });
424 }
425
426 #[test_traced]
427 fn test_archive_prune_keys() {
428 // Initialize the deterministic context
429 let executor = deterministic::Runner::default();
430 executor.start(|context| async move {
431 // Initialize the archive
432 let cfg = Config {
433 translator: FourCap,
434 key_partition: "test-index".into(),
435 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
436 value_partition: "test-value".into(),
437 codec_config: (),
438 compression: None,
439 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
440 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
441 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
442 items_per_section: NZU64!(1), // no mask - each item is its own section
443 };
444 let mut archive = Archive::init(context.clone(), cfg.clone())
445 .await
446 .expect("Failed to initialize archive");
447
448 // Insert multiple keys across different sections
449 let keys = vec![
450 (1u64, test_key("key1-blah"), 1),
451 (2u64, test_key("key2-blah"), 2),
452 (3u64, test_key("key3-blah"), 3),
453 (4u64, test_key("key3-bleh"), 3),
454 (5u64, test_key("key4-blah"), 4),
455 ];
456
457 for (index, key, data) in &keys {
458 archive
459 .put(*index, key.clone(), *data)
460 .await
461 .expect("Failed to put data");
462 }
463
464 // Check metrics
465 let buffer = context.encode();
466 assert!(buffer.contains("items_tracked 5"));
467
468 // Prune sections less than 3
469 archive.prune(3).await.expect("Failed to prune");
470
471 // Ensure keys 1 and 2 are no longer present
472 for (index, key, data) in keys {
473 let retrieved = archive
474 .get(Identifier::Key(&key))
475 .await
476 .expect("Failed to get data");
477 if index < 3 {
478 assert!(retrieved.is_none());
479 } else {
480 assert_eq!(retrieved.expect("Data not found"), data);
481 }
482 }
483
484 // Check metrics
485 let buffer = context.encode();
486 assert!(buffer.contains("items_tracked 3"));
487 assert!(buffer.contains("indices_pruned_total 2"));
488 assert!(buffer.contains("pruned_total 0")); // no lazy cleanup yet
489
490 // Try to prune older section
491 archive.prune(2).await.expect("Failed to prune");
492
493 // Try to prune current section again
494 archive.prune(3).await.expect("Failed to prune");
495
496 // Try to put older index
497 let result = archive.put(1, test_key("key1-blah"), 1).await;
498 assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
499
500 // Trigger lazy removal of keys
501 archive
502 .put(6, test_key("key2-blfh"), 5)
503 .await
504 .expect("Failed to put data");
505
506 // Check metrics
507 let buffer = context.encode();
508 assert!(buffer.contains("items_tracked 4")); // lazily remove one, add one
509 assert!(buffer.contains("indices_pruned_total 2"));
510 assert!(buffer.contains("pruned_total 1"));
511 });
512 }
513
514 fn test_archive_keys_and_restart(num_keys: usize) -> String {
515 // Initialize the deterministic context
516 let executor = deterministic::Runner::default();
517 executor.start(|mut context| async move {
518 // Initialize the archive
519 let items_per_section = 256u64;
520 let cfg = Config {
521 translator: TwoCap,
522 key_partition: "test-index".into(),
523 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
524 value_partition: "test-value".into(),
525 codec_config: (),
526 compression: None,
527 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
528 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
529 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
530 items_per_section: NZU64!(items_per_section),
531 };
532 let mut archive = Archive::init(context.with_label("init1"), cfg.clone())
533 .await
534 .expect("Failed to initialize archive");
535
536 // Insert multiple keys across different sections
537 let mut keys = BTreeMap::new();
538 while keys.len() < num_keys {
539 let index = keys.len() as u64;
540 let mut key = [0u8; 64];
541 context.fill(&mut key);
542 let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
543 let mut data = [0u8; 1024];
544 context.fill(&mut data);
545 let data = FixedBytes::<1024>::decode(data.as_ref()).unwrap();
546
547 archive
548 .put(index, key.clone(), data.clone())
549 .await
550 .expect("Failed to put data");
551 keys.insert(key, (index, data));
552 }
553
554 // Ensure all keys can be retrieved
555 for (key, (index, data)) in &keys {
556 let retrieved = archive
557 .get(Identifier::Index(*index))
558 .await
559 .expect("Failed to get data")
560 .expect("Data not found");
561 assert_eq!(&retrieved, data);
562 let retrieved = archive
563 .get(Identifier::Key(key))
564 .await
565 .expect("Failed to get data")
566 .expect("Data not found");
567 assert_eq!(&retrieved, data);
568 }
569
570 // Check metrics
571 let buffer = context.encode();
572 let tracked = format!("items_tracked {num_keys:?}");
573 assert!(buffer.contains(&tracked));
574 assert!(buffer.contains("pruned_total 0"));
575
576 // Sync and drop the archive
577 archive.sync().await.expect("Failed to sync archive");
578 drop(archive);
579
580 // Reinitialize the archive
581 let cfg = Config {
582 translator: TwoCap,
583 key_partition: "test-index".into(),
584 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
585 value_partition: "test-value".into(),
586 codec_config: (),
587 compression: None,
588 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
589 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
590 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
591 items_per_section: NZU64!(items_per_section),
592 };
593 let mut archive = Archive::<_, _, _, FixedBytes<1024>>::init(
594 context.with_label("init2"),
595 cfg.clone(),
596 )
597 .await
598 .expect("Failed to initialize archive");
599
600 // Ensure all keys can be retrieved
601 for (key, (index, data)) in &keys {
602 let retrieved = archive
603 .get(Identifier::Index(*index))
604 .await
605 .expect("Failed to get data")
606 .expect("Data not found");
607 assert_eq!(&retrieved, data);
608 let retrieved = archive
609 .get(Identifier::Key(key))
610 .await
611 .expect("Failed to get data")
612 .expect("Data not found");
613 assert_eq!(&retrieved, data);
614 }
615
616 // Prune first half
617 let min = (keys.len() / 2) as u64;
618 archive.prune(min).await.expect("Failed to prune");
619
620 // Ensure all keys can be retrieved that haven't been pruned
621 let min = (min / items_per_section) * items_per_section;
622 let mut removed = 0;
623 for (key, (index, data)) in keys {
624 if index >= min {
625 let retrieved = archive
626 .get(Identifier::Key(&key))
627 .await
628 .expect("Failed to get data")
629 .expect("Data not found");
630 assert_eq!(retrieved, data);
631
632 // Check range
633 let (current_end, start_next) = archive.next_gap(index);
634 assert_eq!(current_end.unwrap(), num_keys as u64 - 1);
635 assert!(start_next.is_none());
636 } else {
637 let retrieved = archive
638 .get(Identifier::Key(&key))
639 .await
640 .expect("Failed to get data");
641 assert!(retrieved.is_none());
642 removed += 1;
643
644 // Check range
645 let (current_end, start_next) = archive.next_gap(index);
646 assert!(current_end.is_none());
647 assert_eq!(start_next.unwrap(), min);
648 }
649 }
650
651 // Check metrics
652 let buffer = context.encode();
653 let tracked = format!("items_tracked {:?}", num_keys - removed);
654 assert!(buffer.contains(&tracked));
655 let pruned = format!("indices_pruned_total {removed}");
656 assert!(buffer.contains(&pruned));
657 assert!(buffer.contains("pruned_total 0")); // have not lazily removed keys yet
658
659 context.auditor().state()
660 })
661 }
662
663 #[test_group("slow")]
664 #[test_traced]
665 fn test_archive_many_keys_and_restart() {
666 test_archive_keys_and_restart(100_000);
667 }
668
669 #[test_group("slow")]
670 #[test_traced]
671 fn test_determinism() {
672 let state1 = test_archive_keys_and_restart(5_000);
673 let state2 = test_archive_keys_and_restart(5_000);
674 assert_eq!(state1, state2);
675 }
676
677 #[test_traced]
678 fn test_get_all_after_prune() {
679 let executor = deterministic::Runner::default();
680 executor.start(|context| async move {
681 let cfg = Config {
682 translator: FourCap,
683 key_partition: "test-index".into(),
684 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
685 value_partition: "test-value".into(),
686 codec_config: (),
687 compression: None,
688 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
689 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
690 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
691 items_per_section: NZU64!(1),
692 };
693 let mut archive = Archive::init(context.clone(), cfg)
694 .await
695 .expect("Failed to initialize archive");
696
697 archive.put_multi(1, test_key("aaa"), 10).await.unwrap();
698 archive.put_multi(1, test_key("bbb"), 20).await.unwrap();
699 archive.put_multi(3, test_key("ccc"), 30).await.unwrap();
700
701 // Prune below index 3
702 archive.prune(3).await.unwrap();
703
704 // Pruned index returns None
705 let all = archive.get_all(1).await.unwrap();
706 assert_eq!(all, None);
707
708 // Surviving index still works
709 let all = archive.get_all(3).await.unwrap();
710 assert_eq!(all, Some(vec![30]));
711 });
712 }
713
714 #[test_traced]
715 fn test_put_multi_prune() {
716 let executor = deterministic::Runner::default();
717 executor.start(|context| async move {
718 let cfg = Config {
719 translator: FourCap,
720 key_partition: "test-index".into(),
721 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
722 value_partition: "test-value".into(),
723 codec_config: (),
724 compression: None,
725 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
726 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
727 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
728 items_per_section: NZU64!(1),
729 };
730 let mut archive = Archive::init(context.clone(), cfg)
731 .await
732 .expect("Failed to initialize archive");
733
734 // Two items at index 1, one at index 3
735 archive.put_multi(1, test_key("aaa"), 10).await.unwrap();
736 archive.put_multi(1, test_key("bbb"), 20).await.unwrap();
737 archive.put_multi(3, test_key("ccc"), 30).await.unwrap();
738
739 let buffer = context.encode();
740 assert!(buffer.contains("items_tracked 2"));
741
742 // Prune below index 3
743 archive.prune(3).await.unwrap();
744
745 // Both items at index 1 are gone
746 assert_eq!(
747 archive
748 .get(Identifier::Key(&test_key("aaa")))
749 .await
750 .unwrap(),
751 None
752 );
753 assert_eq!(
754 archive
755 .get(Identifier::Key(&test_key("bbb")))
756 .await
757 .unwrap(),
758 None
759 );
760
761 // Item at index 3 survives
762 assert_eq!(
763 archive
764 .get(Identifier::Key(&test_key("ccc")))
765 .await
766 .unwrap(),
767 Some(30)
768 );
769
770 let buffer = context.encode();
771 assert!(buffer.contains("items_tracked 1"));
772 assert!(buffer.contains("indices_pruned_total 1"));
773
774 // put_multi below pruned index is rejected
775 let result = archive.put_multi(2, test_key("ddd"), 40).await;
776 assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
777 });
778 }
779}