Skip to main content

commonware_storage/archive/immutable/
mod.rs

1//! An immutable key-value store for ordered data with a minimal memory footprint.
2//!
3//! Data is stored in a [crate::freezer::Freezer] and a [crate::ordinal::Ordinal] to enable
4//! lookups by both index and key with minimal memory overhead.
5//!
6//! # Uniqueness
7//!
8//! [Archive] assumes all stored indices are unique. Writing to an occupied index is a no-op.
9//! If the same key is associated with multiple indices, there is no guarantee which value will
10//! be returned.
11//!
12//! # Compression
13//!
14//! [Archive] supports compressing data before storing it on disk. This can be enabled by setting
15//! the `compression` field in the `Config` struct to a valid `zstd` compression level. This setting
16//! can be changed between initializations of [Archive], however, it must remain populated if any
17//! data was written with compression enabled.
18//!
19//! # Querying for Gaps
20//!
21//! [Archive] tracks gaps in the index space to enable the caller to efficiently fetch unknown keys
22//! using `next_gap`. This is a very common pattern when syncing blocks in a blockchain.
23//!
24//! # Example
25//!
26//! ```rust
27//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
28//! use commonware_cryptography::{Hasher as _, Sha256};
29//! use commonware_storage::{
30//!     archive::{
31//!         Archive as _,
32//!         immutable::{Archive, Config},
33//!     },
34//! };
35//! use commonware_utils::{NZUsize, NZU16, NZU64};
36//!
37//! let executor = deterministic::Runner::default();
38//! executor.start(|context| async move {
39//!     // Create an archive
40//!     let cfg = Config {
41//!         metadata_partition: "metadata".into(),
42//!         freezer_table_partition: "table".into(),
43//!         freezer_table_initial_size: 65_536,
44//!         freezer_table_resize_frequency: 4,
45//!         freezer_table_resize_chunk_size: 16_384,
46//!         freezer_key_partition: "key".into(),
47//!         freezer_key_page_cache: CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10)),
48//!         freezer_value_partition: "value".into(),
49//!         freezer_value_target_size: 1024,
50//!         freezer_value_compression: Some(3),
51//!         ordinal_partition: "ordinal".into(),
52//!         items_per_section: NZU64!(1024),
53//!         freezer_key_write_buffer: NZUsize!(1024),
54//!         freezer_value_write_buffer: NZUsize!(1024),
55//!         ordinal_write_buffer: NZUsize!(1024),
56//!         replay_buffer: NZUsize!(1024),
57//!         codec_config: (),
58//!     };
59//!     let mut archive = Archive::init(context, cfg).await.unwrap();
60//!
61//!     // Put a key
62//!     archive.put(1, Sha256::hash(b"data"), 10).await.unwrap();
63//!
64//!     // Sync the archive
65//!     archive.sync().await.unwrap();
66//! });
67
68mod storage;
69use commonware_runtime::buffer::paged::CacheRef;
70use std::num::{NonZeroU64, NonZeroUsize};
71pub use storage::Archive;
72
73/// Configuration for [Archive] storage.
74#[derive(Clone)]
75pub struct Config<C> {
76    /// The partition to use for the archive's metadata.
77    pub metadata_partition: String,
78
79    /// The partition to use for the archive's freezer table.
80    pub freezer_table_partition: String,
81
82    /// The size of the archive's freezer table.
83    pub freezer_table_initial_size: u32,
84
85    /// The number of items added to the freezer table before it is resized.
86    pub freezer_table_resize_frequency: u8,
87
88    /// The number of items to move during each resize operation (many may be required to complete a resize).
89    pub freezer_table_resize_chunk_size: u32,
90
91    /// The partition to use for the archive's freezer keys.
92    pub freezer_key_partition: String,
93
94    /// The page cache to use for the archive's freezer keys.
95    pub freezer_key_page_cache: CacheRef,
96
97    /// The partition to use for the archive's freezer values.
98    pub freezer_value_partition: String,
99
100    /// The target size of the archive's freezer value sections.
101    pub freezer_value_target_size: u64,
102
103    /// The compression level to use for the archive's freezer values.
104    pub freezer_value_compression: Option<u8>,
105
106    /// The partition to use for the archive's ordinal.
107    pub ordinal_partition: String,
108
109    /// The number of items per section.
110    pub items_per_section: NonZeroU64,
111
112    /// The amount of bytes that can be buffered for the freezer key journal before being
113    /// written to a [commonware_runtime::Blob].
114    pub freezer_key_write_buffer: NonZeroUsize,
115
116    /// The amount of bytes that can be buffered for the freezer value journal before being
117    /// written to a [commonware_runtime::Blob].
118    pub freezer_value_write_buffer: NonZeroUsize,
119
120    /// The amount of bytes that can be buffered for the ordinal journal before being
121    /// written to a [commonware_runtime::Blob].
122    pub ordinal_write_buffer: NonZeroUsize,
123
124    /// The buffer size to use when replaying a [commonware_runtime::Blob].
125    pub replay_buffer: NonZeroUsize,
126
127    /// The [commonware_codec::Codec] configuration to use for the value stored in the archive.
128    pub codec_config: C,
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134    use crate::archive::Archive as ArchiveTrait;
135    use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
136    use commonware_runtime::{buffer::paged::CacheRef, deterministic, Runner, Supervisor as _};
137    use commonware_utils::{NZUsize, NZU16, NZU64};
138    use std::num::NonZeroU16;
139
140    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
141    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
142
143    #[test]
144    fn test_unclean_shutdown() {
145        let executor = deterministic::Runner::default();
146        executor.start(|context| async move {
147            let cfg = Config {
148                metadata_partition: "test-metadata2".into(),
149                freezer_table_partition: "test-table2".into(),
150                freezer_table_initial_size: 8192, // Must be power of 2
151                freezer_table_resize_frequency: 4,
152                freezer_table_resize_chunk_size: 8192,
153                freezer_key_partition: "test-key2".into(),
154                freezer_key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
155                freezer_value_partition: "test-value2".into(),
156                freezer_value_target_size: 1024 * 1024,
157                freezer_value_compression: Some(3),
158                ordinal_partition: "test-ordinal2".into(),
159                items_per_section: NZU64!(512),
160                freezer_key_write_buffer: NZUsize!(1024),
161                freezer_value_write_buffer: NZUsize!(1024),
162                ordinal_write_buffer: NZUsize!(1024),
163                replay_buffer: NZUsize!(1024),
164                codec_config: (),
165            };
166
167            // First initialization
168            let archive: Archive<_, Digest, i32> =
169                Archive::init(context.child("first"), cfg.clone())
170                    .await
171                    .unwrap();
172            drop(archive);
173
174            // Second initialization
175            let mut archive = Archive::init(context.child("second"), cfg.clone())
176                .await
177                .unwrap();
178
179            // Add some data
180            let key1 = Sha256::hash(b"key1");
181            let key2 = Sha256::hash(b"key2");
182            archive.put(1, key1, 2000).await.unwrap();
183            archive.put(2, key2, 2001).await.unwrap();
184
185            // Sync archive to save the checkpoint
186            archive.sync().await.unwrap();
187            drop(archive);
188
189            // Re-initialize archive (should load from checkpoint)
190            let archive = Archive::init(context.child("third"), cfg).await.unwrap();
191
192            // Verify data persisted
193            assert_eq!(
194                archive
195                    .get(crate::archive::Identifier::Key(&key1))
196                    .await
197                    .unwrap(),
198                Some(2000)
199            );
200            assert_eq!(
201                archive
202                    .get(crate::archive::Identifier::Key(&key2))
203                    .await
204                    .unwrap(),
205                Some(2001)
206            );
207        });
208    }
209
210    #[test]
211    fn test_sync_empty_archive_then_restart() {
212        let executor = deterministic::Runner::default();
213        executor.start(|context| async move {
214            let cfg = Config {
215                metadata_partition: "empty-metadata".into(),
216                freezer_table_partition: "empty-table".into(),
217                freezer_table_initial_size: 8192,
218                freezer_table_resize_frequency: 4,
219                freezer_table_resize_chunk_size: 8192,
220                freezer_key_partition: "empty-key".into(),
221                freezer_key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
222                freezer_value_partition: "empty-value".into(),
223                freezer_value_target_size: 1024 * 1024,
224                freezer_value_compression: Some(3),
225                ordinal_partition: "empty-ordinal".into(),
226                items_per_section: NZU64!(512),
227                freezer_key_write_buffer: NZUsize!(1024),
228                freezer_value_write_buffer: NZUsize!(1024),
229                ordinal_write_buffer: NZUsize!(1024),
230                replay_buffer: NZUsize!(1024),
231                codec_config: (),
232            };
233
234            // Initialize archive, sync without writing anything, then drop
235            let mut archive: Archive<_, Digest, i32> =
236                Archive::init(context.child("first"), cfg.clone())
237                    .await
238                    .unwrap();
239            archive.sync().await.unwrap();
240            drop(archive);
241
242            // Re-initialize -- should not fail with SectionOutOfRange(0)
243            let mut archive: Archive<_, Digest, i32> =
244                Archive::init(context.child("second"), cfg.clone())
245                    .await
246                    .unwrap();
247
248            // Write data after restart to confirm archive is functional
249            let key = Sha256::hash(b"after-restart");
250            archive.put_sync(0, key, 42).await.unwrap();
251            drop(archive);
252
253            // Third init to verify persistence
254            let archive: Archive<_, Digest, i32> =
255                Archive::init(context.child("third"), cfg).await.unwrap();
256            assert_eq!(
257                archive
258                    .get(crate::archive::Identifier::Key(&key))
259                    .await
260                    .unwrap(),
261                Some(42)
262            );
263        });
264    }
265}