commonware_storage/archive/immutable/
mod.rs

1//! An immutable key-value store for ordered data with a minimal memory footprint.
2//!
3//! Data is stored in a [crate::freezer::Freezer] and a [crate::ordinal::Ordinal] to enable
4//! lookups by both index and key with minimal memory overhead.
5//!
6//! # Uniqueness
7//!
8//! [Archive] assumes all stored indexes and keys are unique. If the same key is associated with
9//! multiple `indices`, there is no guarantee which value will be returned. If the key is written to
10//! an existing `index`, [Archive] will return an error.
11//!
12//! # Compression
13//!
14//! [Archive] supports compressing data before storing it on disk. This can be enabled by setting
15//! the `compression` field in the `Config` struct to a valid `zstd` compression level. This setting
16//! can be changed between initializations of [Archive], however, it must remain populated if any
17//! data was written with compression enabled.
18//!
19//! # Querying for Gaps
20//!
21//! [Archive] tracks gaps in the index space to enable the caller to efficiently fetch unknown keys
22//! using `next_gap`. This is a very common pattern when syncing blocks in a blockchain.
23//!
24//! # Example
25//!
26//! ```rust
27//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::PoolRef};
28//! use commonware_cryptography::{Hasher as _, Sha256};
29//! use commonware_storage::{
30//!     archive::{
31//!         Archive as _,
32//!         immutable::{Archive, Config},
33//!     },
34//! };
35//! use commonware_utils::{NZUsize, NZU64};
36//!
37//! let executor = deterministic::Runner::default();
38//! executor.start(|context| async move {
39//!     // Create an archive
40//!     let cfg = Config {
41//!         metadata_partition: "metadata".into(),
42//!         freezer_table_partition: "table".into(),
43//!         freezer_table_initial_size: 65_536,
44//!         freezer_table_resize_frequency: 4,
45//!         freezer_table_resize_chunk_size: 16_384,
46//!         freezer_journal_partition: "journal".into(),
47//!         freezer_journal_target_size: 1024,
48//!         freezer_journal_compression: Some(3),
49//!         freezer_journal_buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
50//!         ordinal_partition: "ordinal".into(),
51//!         items_per_section: NZU64!(1024),
52//!         write_buffer: NZUsize!(1024),
53//!         replay_buffer: NZUsize!(1024),
54//!         codec_config: (),
55//!     };
56//!     let mut archive = Archive::init(context, cfg).await.unwrap();
57//!
58//!     // Put a key
59//!     archive.put(1, Sha256::hash(b"data"), 10).await.unwrap();
60//!
61//!     // Close the archive (also closes the freezer and ordinal)
62//!     archive.close().await.unwrap();
63//! });
64
65mod storage;
66use commonware_runtime::buffer::PoolRef;
67use std::num::{NonZeroU64, NonZeroUsize};
68pub use storage::Archive;
69
70/// Configuration for [Archive] storage.
71#[derive(Clone)]
72pub struct Config<C> {
73    /// The partition to use for the archive's metadata.
74    pub metadata_partition: String,
75
76    /// The partition to use for the archive's freezer table.
77    pub freezer_table_partition: String,
78
79    /// The size of the archive's freezer table.
80    pub freezer_table_initial_size: u32,
81
82    /// The number of items added to the freezer table before it is resized.
83    pub freezer_table_resize_frequency: u8,
84
85    /// The number of items to move during each resize operation (many may be required to complete a resize).
86    pub freezer_table_resize_chunk_size: u32,
87
88    /// The partition to use for the archive's freezer journal.
89    pub freezer_journal_partition: String,
90
91    /// The target size of the archive's freezer journal.
92    pub freezer_journal_target_size: u64,
93
94    /// The compression level to use for the archive's freezer journal.
95    pub freezer_journal_compression: Option<u8>,
96
97    /// The buffer pool to use for the archive's freezer journal.
98    pub freezer_journal_buffer_pool: PoolRef,
99
100    /// The partition to use for the archive's ordinal.
101    pub ordinal_partition: String,
102
103    /// The number of items per section.
104    pub items_per_section: NonZeroU64,
105
106    /// The amount of bytes that can be buffered in a section before being written to a
107    /// [commonware_runtime::Blob].
108    pub write_buffer: NonZeroUsize,
109
110    /// The buffer size to use when replaying a [commonware_runtime::Blob].
111    pub replay_buffer: NonZeroUsize,
112
113    /// The [commonware_codec::Codec] configuration to use for the value stored in the archive.
114    pub codec_config: C,
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use crate::archive::Archive as ArchiveTrait;
121    use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
122    use commonware_runtime::{buffer::PoolRef, deterministic, Runner};
123    use commonware_utils::{NZUsize, NZU64};
124
125    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
126    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
127
128    #[test]
129    fn test_unclean_shutdown() {
130        let executor = deterministic::Runner::default();
131        executor.start(|context| async move {
132            let cfg = Config {
133                metadata_partition: "test_metadata2".into(),
134                freezer_table_partition: "test_table2".into(),
135                freezer_table_initial_size: 8192, // Must be power of 2
136                freezer_table_resize_frequency: 4,
137                freezer_table_resize_chunk_size: 8192,
138                freezer_journal_partition: "test_journal2".into(),
139                freezer_journal_target_size: 1024 * 1024,
140                freezer_journal_compression: Some(3),
141                freezer_journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
142                ordinal_partition: "test_ordinal2".into(),
143                items_per_section: NZU64!(512),
144                write_buffer: NZUsize!(1024),
145                replay_buffer: NZUsize!(1024),
146                codec_config: (),
147            };
148
149            // First initialization
150            let archive: Archive<_, Digest, i32> =
151                Archive::init(context.clone(), cfg.clone()).await.unwrap();
152            drop(archive);
153
154            // Second initialization
155            let mut archive = Archive::init(context.clone(), cfg.clone()).await.unwrap();
156
157            // Add some data
158            let key1 = Sha256::hash(b"key1");
159            let key2 = Sha256::hash(b"key2");
160            archive.put(1, key1, 2000).await.unwrap();
161            archive.put(2, key2, 2001).await.unwrap();
162
163            // Close archive (this should save the checkpoint)
164            archive.close().await.unwrap();
165
166            // Re-initialize archive (should load from checkpoint)
167            let archive = Archive::init(context, cfg).await.unwrap();
168
169            // Verify data persisted
170            assert_eq!(
171                archive
172                    .get(crate::archive::Identifier::Key(&key1))
173                    .await
174                    .unwrap(),
175                Some(2000)
176            );
177            assert_eq!(
178                archive
179                    .get(crate::archive::Identifier::Key(&key2))
180                    .await
181                    .unwrap(),
182                Some(2001)
183            );
184
185            // Close the archive
186            archive.close().await.unwrap();
187        });
188    }
189}