commonware_storage/archive/prunable/mod.rs
1//! A prunable key-value store for ordered data.
2//!
3//! Data is stored across two backends: [crate::journal::segmented::fixed] for fixed-size index entries and
4//! [crate::journal::segmented::glob::Glob] for values (managed by [crate::journal::segmented::oversized]).
5//! The location of written data is stored in-memory by both index and key (via [crate::index::unordered::Index])
6//! to enable efficient lookups (on average).
7//!
8//! _Notably, [Archive] does not make use of compaction nor on-disk indexes (and thus has no read
9//! nor write amplification during normal operation).
10//!
11//! # Format
12//!
13//! [Archive] uses a two-journal structure for efficient page cache usage:
14//!
15//! **Index Journal (segmented/fixed)** - Fixed-size entries for fast startup replay:
16//! ```text
17//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
18//! | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |13 |14 |15 |16 |17 |18 |19 |20 |21 |22 |23 |
19//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
20//! | Index(u64) |Key(Fixed Size)| val_offset(u64) | val_size(u32) |
21//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
22//! ```
23//!
24//! **Value Blob** - Raw values with CRC32 checksums (direct reads, no page cache):
25//! ```text
26//! +---+---+---+---+---+---+---+---+---+---+---+---+
27//! | Compressed Data (variable) | CRC32 |
28//! +---+---+---+---+---+---+---+---+---+---+---+---+
29//! ```
30//!
31//! # Uniqueness
32//!
33//! [Archive] assumes all stored indexes and keys are unique. If the same key is associated with
34//! multiple `indices`, there is no guarantee which value will be returned. If the key is written to
35//! an existing `index`, [Archive] will return an error.
36//!
37//! ## Conflicts
38//!
39//! Because a translated representation of a key is only ever stored in memory, it is possible (and
40//! expected) that two keys will eventually be represented by the same translated key. To handle
41//! this case, [Archive] must check the persisted form of all conflicting keys to ensure data from
42//! the correct key is returned. To support efficient checks, [Archive] (via
43//! [crate::index::unordered::Index]) keeps a linked list of all keys with the same translated
44//! prefix:
45//!
46//! ```rust
47//! struct Record {
48//! index: u64,
49//!
50//! next: Option<Box<Record>>,
51//! }
52//! ```
53//!
54//! _To avoid random memory reads in the common case, the in-memory index directly stores the first
55//! item in the linked list instead of a pointer to the first item._
56//!
57//! `index` is the key to the map used to serve lookups by `index` that stores the position in the
58//! index journal (selected by `section = index / items_per_section * items_per_section` to minimize
59//! the number of open blobs):
60//!
61//! ```text
62//! // Maps index -> position in index journal
63//! indices: BTreeMap<u64, u64>
64//! ```
65//!
66//! _If the [Translator] provided by the caller does not uniformly distribute keys across the key
67//! space or uses a translated representation that means keys on average have many conflicts,
68//! performance will degrade._
69//!
70//! ## Memory Overhead
71//!
72//! [Archive] uses two maps to enable lookups by both index and key. The memory used to track each
73//! index item is `8 + 8` (where `8` is the index and `8` is the position in the index journal).
74//! The memory used to track each key item is `~translated(key).len() + 16` bytes (where `16` is the
75//! size of the `Record` struct). This means that an [Archive] employing a [Translator] that uses
76//! the first `8` bytes of a key will use `~40` bytes to index each key.
77//!
78//! # Pruning
79//!
80//! [Archive] supports pruning up to a minimum `index` using the `prune` method. After `prune` is
81//! called on a `section`, all interaction with a `section` less than the pruned `section` will
82//! return an error.
83//!
84//! ## Lazy Index Cleanup
85//!
86//! Instead of performing a full iteration of the in-memory index, storing an additional in-memory
87//! index per `section`, or replaying a `section` of the value blob,
88//! [Archive] lazily cleans up the [crate::index::unordered::Index] after pruning. When a new key is
89//! stored that overlaps (same translated value) with a pruned key, the pruned key is removed from
90//! the in-memory index.
91//!
92//! # Read Path
93//!
94//! All reads (by index or key) first read the index entry from the index journal to get the
95//! value location (offset and size), then read the value from the value blob. The index journal
96//! uses a page cache for caching, so hot entries are served from memory. Values are read directly
97//! from disk without caching to avoid polluting the page cache with large values.
98//!
99//! # Compression
100//!
101//! [Archive] supports compressing data before storing it on disk. This can be enabled by setting
102//! the `compression` field in the `Config` struct to a valid `zstd` compression level. This setting
103//! can be changed between initializations of [Archive], however, it must remain populated if any
104//! data was written with compression enabled.
105//!
106//! # Querying for Gaps
107//!
108//! [Archive] tracks gaps in the index space to enable the caller to efficiently fetch unknown keys
109//! using `next_gap`. This is a very common pattern when syncing blocks in a blockchain.
110//!
111//! # Example
112//!
113//! ```rust
114//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
115//! use commonware_cryptography::{Hasher as _, Sha256};
116//! use commonware_storage::{
117//! translator::FourCap,
118//! archive::{
119//! Archive as _,
120//! prunable::{Archive, Config},
121//! },
122//! };
123//! use commonware_utils::{NZUsize, NZU16, NZU64};
124//!
125//! let executor = deterministic::Runner::default();
126//! executor.start(|context| async move {
127//! // Create an archive
128//! let cfg = Config {
129//! translator: FourCap,
130//! key_partition: "demo_index".into(),
131//! key_page_cache: CacheRef::new(NZU16!(1024), NZUsize!(10)),
132//! value_partition: "demo_value".into(),
133//! compression: Some(3),
134//! codec_config: (),
135//! items_per_section: NZU64!(1024),
136//! key_write_buffer: NZUsize!(1024 * 1024),
137//! value_write_buffer: NZUsize!(1024 * 1024),
138//! replay_buffer: NZUsize!(4096),
139//! };
140//! let mut archive = Archive::init(context, cfg).await.unwrap();
141//!
142//! // Put a key
143//! archive.put(1, Sha256::hash(b"data"), 10).await.unwrap();
144//!
145//! // Sync the archive
146//! archive.sync().await.unwrap();
147//! });
148//! ```
149
150use crate::translator::Translator;
151use commonware_runtime::buffer::paged::CacheRef;
152use std::num::{NonZeroU64, NonZeroUsize};
153
154mod storage;
155pub use storage::Archive;
156
157/// Configuration for [Archive] storage.
158#[derive(Clone)]
159pub struct Config<T: Translator, C> {
160 /// Logic to transform keys into their index representation.
161 ///
162 /// [Archive] assumes that all internal keys are spread uniformly across the key space.
163 /// If that is not the case, lookups may be O(n) instead of O(1).
164 pub translator: T,
165
166 /// The partition to use for the key journal (stores index+key metadata).
167 pub key_partition: String,
168
169 /// The page cache to use for the key journal.
170 pub key_page_cache: CacheRef,
171
172 /// The partition to use for the value blob (stores values).
173 pub value_partition: String,
174
175 /// The compression level to use for the value blob.
176 pub compression: Option<u8>,
177
178 /// The [commonware_codec::Codec] configuration to use for the value stored in the archive.
179 pub codec_config: C,
180
181 /// The number of items per section (the granularity of pruning).
182 pub items_per_section: NonZeroU64,
183
184 /// The amount of bytes that can be buffered for the key journal before being written to a
185 /// [commonware_runtime::Blob].
186 pub key_write_buffer: NonZeroUsize,
187
188 /// The amount of bytes that can be buffered for the value journal before being written to a
189 /// [commonware_runtime::Blob].
190 pub value_write_buffer: NonZeroUsize,
191
192 /// The buffer size to use when replaying a [commonware_runtime::Blob].
193 pub replay_buffer: NonZeroUsize,
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use crate::{
200 archive::{Archive as _, Error, Identifier},
201 journal::Error as JournalError,
202 kv::tests::test_key,
203 translator::{FourCap, TwoCap},
204 };
205 use commonware_codec::{DecodeExt, Error as CodecError};
206 use commonware_macros::{test_group, test_traced};
207 use commonware_runtime::{deterministic, Metrics, Runner};
208 use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
209 use rand::Rng;
210 use std::{collections::BTreeMap, num::NonZeroU16};
211
212 const DEFAULT_ITEMS_PER_SECTION: u64 = 65536;
213 const DEFAULT_WRITE_BUFFER: usize = 1024;
214 const DEFAULT_REPLAY_BUFFER: usize = 4096;
215 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
216 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
217
218 #[test_traced]
219 fn test_archive_compression_then_none() {
220 // Initialize the deterministic context
221 let executor = deterministic::Runner::default();
222 executor.start(|context| async move {
223 // Initialize the archive
224 let cfg = Config {
225 translator: FourCap,
226 key_partition: "test_index".into(),
227 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
228 value_partition: "test_value".into(),
229 codec_config: (),
230 compression: Some(3),
231 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
232 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
233 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
234 items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
235 };
236 let mut archive = Archive::init(context.with_label("first"), cfg.clone())
237 .await
238 .expect("Failed to initialize archive");
239
240 // Put the key-data pair
241 let index = 1u64;
242 let key = test_key("testkey");
243 let data = 1;
244 archive
245 .put(index, key.clone(), data)
246 .await
247 .expect("Failed to put data");
248
249 // Sync and drop the archive
250 archive.sync().await.expect("Failed to sync archive");
251 drop(archive);
252
253 // Initialize the archive again without compression.
254 // Index journal replay succeeds (no compression), but value reads will fail.
255 let cfg = Config {
256 translator: FourCap,
257 key_partition: "test_index".into(),
258 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
259 value_partition: "test_value".into(),
260 codec_config: (),
261 compression: None,
262 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
263 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
264 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
265 items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
266 };
267 let archive = Archive::<_, _, FixedBytes<64>, i32>::init(
268 context.with_label("second"),
269 cfg.clone(),
270 )
271 .await
272 .unwrap();
273
274 // Getting the value should fail because compression settings mismatch.
275 // Without compression, the codec sees extra bytes after decoding the value
276 // (because the compressed data doesn't match the expected format).
277 let result: Result<Option<i32>, _> = archive.get(Identifier::Index(index)).await;
278 assert!(matches!(
279 result,
280 Err(Error::Journal(JournalError::Codec(CodecError::ExtraData(
281 _
282 ))))
283 ));
284 });
285 }
286
287 #[test_traced]
288 fn test_archive_overlapping_key_basic() {
289 // Initialize the deterministic context
290 let executor = deterministic::Runner::default();
291 executor.start(|context| async move {
292 // Initialize the archive
293 let cfg = Config {
294 translator: FourCap,
295 key_partition: "test_index".into(),
296 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
297 value_partition: "test_value".into(),
298 codec_config: (),
299 compression: None,
300 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
301 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
302 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
303 items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
304 };
305 let mut archive = Archive::init(context.clone(), cfg.clone())
306 .await
307 .expect("Failed to initialize archive");
308
309 let index1 = 1u64;
310 let key1 = test_key("keys1");
311 let data1 = 1;
312 let index2 = 2u64;
313 let key2 = test_key("keys2");
314 let data2 = 2;
315
316 // Put the key-data pair
317 archive
318 .put(index1, key1.clone(), data1)
319 .await
320 .expect("Failed to put data");
321
322 // Put the key-data pair
323 archive
324 .put(index2, key2.clone(), data2)
325 .await
326 .expect("Failed to put data");
327
328 // Get the data back
329 let retrieved = archive
330 .get(Identifier::Key(&key1))
331 .await
332 .expect("Failed to get data")
333 .expect("Data not found");
334 assert_eq!(retrieved, data1);
335
336 // Get the data back
337 let retrieved = archive
338 .get(Identifier::Key(&key2))
339 .await
340 .expect("Failed to get data")
341 .expect("Data not found");
342 assert_eq!(retrieved, data2);
343
344 // Check metrics
345 let buffer = context.encode();
346 assert!(buffer.contains("items_tracked 2"));
347 assert!(buffer.contains("unnecessary_reads_total 1"));
348 assert!(buffer.contains("gets_total 2"));
349 });
350 }
351
352 #[test_traced]
353 fn test_archive_overlapping_key_multiple_sections() {
354 // Initialize the deterministic context
355 let executor = deterministic::Runner::default();
356 executor.start(|context| async move {
357 // Initialize the archive
358 let cfg = Config {
359 translator: FourCap,
360 key_partition: "test_index".into(),
361 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
362 value_partition: "test_value".into(),
363 codec_config: (),
364 compression: None,
365 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
366 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
367 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
368 items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
369 };
370 let mut archive = Archive::init(context.clone(), cfg.clone())
371 .await
372 .expect("Failed to initialize archive");
373
374 let index1 = 1u64;
375 let key1 = test_key("keys1");
376 let data1 = 1;
377 let index2 = 2_000_000u64;
378 let key2 = test_key("keys2");
379 let data2 = 2;
380
381 // Put the key-data pair
382 archive
383 .put(index1, key1.clone(), data1)
384 .await
385 .expect("Failed to put data");
386
387 // Put the key-data pair
388 archive
389 .put(index2, key2.clone(), data2)
390 .await
391 .expect("Failed to put data");
392
393 // Get the data back
394 let retrieved = archive
395 .get(Identifier::Key(&key1))
396 .await
397 .expect("Failed to get data")
398 .expect("Data not found");
399 assert_eq!(retrieved, data1);
400
401 // Get the data back
402 let retrieved = archive
403 .get(Identifier::Key(&key2))
404 .await
405 .expect("Failed to get data")
406 .expect("Data not found");
407 assert_eq!(retrieved, data2);
408 });
409 }
410
411 #[test_traced]
412 fn test_archive_prune_keys() {
413 // Initialize the deterministic context
414 let executor = deterministic::Runner::default();
415 executor.start(|context| async move {
416 // Initialize the archive
417 let cfg = Config {
418 translator: FourCap,
419 key_partition: "test_index".into(),
420 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
421 value_partition: "test_value".into(),
422 codec_config: (),
423 compression: None,
424 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
425 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
426 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
427 items_per_section: NZU64!(1), // no mask - each item is its own section
428 };
429 let mut archive = Archive::init(context.clone(), cfg.clone())
430 .await
431 .expect("Failed to initialize archive");
432
433 // Insert multiple keys across different sections
434 let keys = vec![
435 (1u64, test_key("key1-blah"), 1),
436 (2u64, test_key("key2-blah"), 2),
437 (3u64, test_key("key3-blah"), 3),
438 (4u64, test_key("key3-bleh"), 3),
439 (5u64, test_key("key4-blah"), 4),
440 ];
441
442 for (index, key, data) in &keys {
443 archive
444 .put(*index, key.clone(), *data)
445 .await
446 .expect("Failed to put data");
447 }
448
449 // Check metrics
450 let buffer = context.encode();
451 assert!(buffer.contains("items_tracked 5"));
452
453 // Prune sections less than 3
454 archive.prune(3).await.expect("Failed to prune");
455
456 // Ensure keys 1 and 2 are no longer present
457 for (index, key, data) in keys {
458 let retrieved = archive
459 .get(Identifier::Key(&key))
460 .await
461 .expect("Failed to get data");
462 if index < 3 {
463 assert!(retrieved.is_none());
464 } else {
465 assert_eq!(retrieved.expect("Data not found"), data);
466 }
467 }
468
469 // Check metrics
470 let buffer = context.encode();
471 assert!(buffer.contains("items_tracked 3"));
472 assert!(buffer.contains("indices_pruned_total 2"));
473 assert!(buffer.contains("pruned_total 0")); // no lazy cleanup yet
474
475 // Try to prune older section
476 archive.prune(2).await.expect("Failed to prune");
477
478 // Try to prune current section again
479 archive.prune(3).await.expect("Failed to prune");
480
481 // Try to put older index
482 let result = archive.put(1, test_key("key1-blah"), 1).await;
483 assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
484
485 // Trigger lazy removal of keys
486 archive
487 .put(6, test_key("key2-blfh"), 5)
488 .await
489 .expect("Failed to put data");
490
491 // Check metrics
492 let buffer = context.encode();
493 assert!(buffer.contains("items_tracked 4")); // lazily remove one, add one
494 assert!(buffer.contains("indices_pruned_total 2"));
495 assert!(buffer.contains("pruned_total 1"));
496 });
497 }
498
499 fn test_archive_keys_and_restart(num_keys: usize) -> String {
500 // Initialize the deterministic context
501 let executor = deterministic::Runner::default();
502 executor.start(|mut context| async move {
503 // Initialize the archive
504 let items_per_section = 256u64;
505 let cfg = Config {
506 translator: TwoCap,
507 key_partition: "test_index".into(),
508 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
509 value_partition: "test_value".into(),
510 codec_config: (),
511 compression: None,
512 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
513 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
514 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
515 items_per_section: NZU64!(items_per_section),
516 };
517 let mut archive = Archive::init(context.with_label("init1"), cfg.clone())
518 .await
519 .expect("Failed to initialize archive");
520
521 // Insert multiple keys across different sections
522 let mut keys = BTreeMap::new();
523 while keys.len() < num_keys {
524 let index = keys.len() as u64;
525 let mut key = [0u8; 64];
526 context.fill(&mut key);
527 let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
528 let mut data = [0u8; 1024];
529 context.fill(&mut data);
530 let data = FixedBytes::<1024>::decode(data.as_ref()).unwrap();
531
532 archive
533 .put(index, key.clone(), data.clone())
534 .await
535 .expect("Failed to put data");
536 keys.insert(key, (index, data));
537 }
538
539 // Ensure all keys can be retrieved
540 for (key, (index, data)) in &keys {
541 let retrieved = archive
542 .get(Identifier::Index(*index))
543 .await
544 .expect("Failed to get data")
545 .expect("Data not found");
546 assert_eq!(&retrieved, data);
547 let retrieved = archive
548 .get(Identifier::Key(key))
549 .await
550 .expect("Failed to get data")
551 .expect("Data not found");
552 assert_eq!(&retrieved, data);
553 }
554
555 // Check metrics
556 let buffer = context.encode();
557 let tracked = format!("items_tracked {num_keys:?}");
558 assert!(buffer.contains(&tracked));
559 assert!(buffer.contains("pruned_total 0"));
560
561 // Sync and drop the archive
562 archive.sync().await.expect("Failed to sync archive");
563 drop(archive);
564
565 // Reinitialize the archive
566 let cfg = Config {
567 translator: TwoCap,
568 key_partition: "test_index".into(),
569 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
570 value_partition: "test_value".into(),
571 codec_config: (),
572 compression: None,
573 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
574 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
575 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
576 items_per_section: NZU64!(items_per_section),
577 };
578 let mut archive = Archive::<_, _, _, FixedBytes<1024>>::init(
579 context.with_label("init2"),
580 cfg.clone(),
581 )
582 .await
583 .expect("Failed to initialize archive");
584
585 // Ensure all keys can be retrieved
586 for (key, (index, data)) in &keys {
587 let retrieved = archive
588 .get(Identifier::Index(*index))
589 .await
590 .expect("Failed to get data")
591 .expect("Data not found");
592 assert_eq!(&retrieved, data);
593 let retrieved = archive
594 .get(Identifier::Key(key))
595 .await
596 .expect("Failed to get data")
597 .expect("Data not found");
598 assert_eq!(&retrieved, data);
599 }
600
601 // Prune first half
602 let min = (keys.len() / 2) as u64;
603 archive.prune(min).await.expect("Failed to prune");
604
605 // Ensure all keys can be retrieved that haven't been pruned
606 let min = (min / items_per_section) * items_per_section;
607 let mut removed = 0;
608 for (key, (index, data)) in keys {
609 if index >= min {
610 let retrieved = archive
611 .get(Identifier::Key(&key))
612 .await
613 .expect("Failed to get data")
614 .expect("Data not found");
615 assert_eq!(retrieved, data);
616
617 // Check range
618 let (current_end, start_next) = archive.next_gap(index);
619 assert_eq!(current_end.unwrap(), num_keys as u64 - 1);
620 assert!(start_next.is_none());
621 } else {
622 let retrieved = archive
623 .get(Identifier::Key(&key))
624 .await
625 .expect("Failed to get data");
626 assert!(retrieved.is_none());
627 removed += 1;
628
629 // Check range
630 let (current_end, start_next) = archive.next_gap(index);
631 assert!(current_end.is_none());
632 assert_eq!(start_next.unwrap(), min);
633 }
634 }
635
636 // Check metrics
637 let buffer = context.encode();
638 let tracked = format!("items_tracked {:?}", num_keys - removed);
639 assert!(buffer.contains(&tracked));
640 let pruned = format!("indices_pruned_total {removed}");
641 assert!(buffer.contains(&pruned));
642 assert!(buffer.contains("pruned_total 0")); // have not lazily removed keys yet
643
644 context.auditor().state()
645 })
646 }
647
648 #[test_group("slow")]
649 #[test_traced]
650 fn test_archive_many_keys_and_restart() {
651 test_archive_keys_and_restart(100_000);
652 }
653
654 #[test_group("slow")]
655 #[test_traced]
656 fn test_determinism() {
657 let state1 = test_archive_keys_and_restart(5_000);
658 let state2 = test_archive_keys_and_restart(5_000);
659 assert_eq!(state1, state2);
660 }
661}