commonware_storage/archive/
mod.rs

1//! A write-once key-value store for ordered data.
2//!
3//! [Archive] is a key-value store designed for workloads where all data is written only once and is
4//! uniquely associated with both an `index` and a `key`.
5
6use commonware_codec::Codec;
7use commonware_utils::Array;
8use std::future::Future;
9use thiserror::Error;
10
11pub mod immutable;
12pub mod prunable;
13
14/// Subject of a `get` or `has` operation.
15pub enum Identifier<'a, K: Array> {
16    Index(u64),
17    Key(&'a K),
18}
19
20/// Errors that can occur when interacting with the archive.
21#[derive(Debug, Error)]
22pub enum Error {
23    #[error("journal error: {0}")]
24    Journal(#[from] crate::journal::Error),
25    #[error("ordinal error: {0}")]
26    Ordinal(#[from] crate::ordinal::Error),
27    #[error("metadata error: {0}")]
28    Metadata(#[from] crate::metadata::Error),
29    #[error("freezer error: {0}")]
30    Freezer(#[from] crate::freezer::Error),
31    #[error("record corrupted")]
32    RecordCorrupted,
33    #[error("already pruned to: {0}")]
34    AlreadyPrunedTo(u64),
35    #[error("record too large")]
36    RecordTooLarge,
37}
38
39/// A write-once key-value store where each key is associated with a unique index.
40pub trait Archive {
41    /// The type of the key.
42    type Key: Array;
43
44    /// The type of the value.
45    type Value: Codec;
46
47    /// Store an item in [Archive]. Both indices and keys are assumed to both be globally unique.
48    ///
49    /// If the index already exists, put does nothing and returns. If the same key is stored multiple times
50    /// at different indices (not recommended), any value associated with the key may be returned.
51    fn put(
52        &mut self,
53        index: u64,
54        key: Self::Key,
55        value: Self::Value,
56    ) -> impl Future<Output = Result<(), Error>>;
57
58    /// Perform a [Archive::put] and [Archive::sync] in a single operation.
59    fn put_sync(
60        &mut self,
61        index: u64,
62        key: Self::Key,
63        value: Self::Value,
64    ) -> impl Future<Output = Result<(), Error>> {
65        async move {
66            self.put(index, key, value).await?;
67            self.sync().await
68        }
69    }
70
71    /// Retrieve an item from [Archive].
72    fn get(
73        &self,
74        identifier: Identifier<'_, Self::Key>,
75    ) -> impl Future<Output = Result<Option<Self::Value>, Error>>;
76
77    /// Check if an item exists in [Archive].
78    fn has(
79        &self,
80        identifier: Identifier<'_, Self::Key>,
81    ) -> impl Future<Output = Result<bool, Error>>;
82
83    /// Retrieve the end of the current range including `index` (inclusive) and
84    /// the start of the next range after `index` (if it exists).
85    ///
86    /// This is useful for driving backfill operations over the archive.
87    fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>);
88
89    /// Sync all pending writes.
90    fn sync(&mut self) -> impl Future<Output = Result<(), Error>>;
91
92    /// Close [Archive] (and underlying storage).
93    ///
94    /// Any pending writes are synced prior to closing.
95    fn close(self) -> impl Future<Output = Result<(), Error>>;
96
97    /// Remove all persistent data created by this [Archive].
98    fn destroy(self) -> impl Future<Output = Result<(), Error>>;
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104    use crate::translator::TwoCap;
105    use commonware_codec::DecodeExt;
106    use commonware_macros::test_traced;
107    use commonware_runtime::{
108        buffer::PoolRef,
109        deterministic::{self, Context},
110        Runner,
111    };
112    use commonware_utils::{sequence::FixedBytes, NZUsize, NZU64};
113    use rand::Rng;
114    use std::{collections::BTreeMap, num::NonZeroUsize};
115
116    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
117    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
118
119    fn test_key(key: &str) -> FixedBytes<64> {
120        let mut buf = [0u8; 64];
121        let key = key.as_bytes();
122        assert!(key.len() <= buf.len());
123        buf[..key.len()].copy_from_slice(key);
124        FixedBytes::decode(buf.as_ref()).unwrap()
125    }
126
127    async fn create_prunable(
128        context: Context,
129        compression: Option<u8>,
130    ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
131        let cfg = prunable::Config {
132            partition: "test".into(),
133            translator: TwoCap,
134            compression,
135            codec_config: (),
136            items_per_section: NZU64!(1024),
137            write_buffer: NZUsize!(1024),
138            replay_buffer: NZUsize!(1024),
139            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
140        };
141        prunable::Archive::init(context, cfg).await.unwrap()
142    }
143
144    async fn create_immutable(
145        context: Context,
146        compression: Option<u8>,
147    ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
148        let cfg = immutable::Config {
149            metadata_partition: "test_metadata".into(),
150            freezer_table_partition: "test_table".into(),
151            freezer_table_initial_size: 64,
152            freezer_table_resize_frequency: 2,
153            freezer_table_resize_chunk_size: 32,
154            freezer_journal_partition: "test_journal".into(),
155            freezer_journal_target_size: 1024 * 1024,
156            freezer_journal_compression: compression,
157            freezer_journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
158            ordinal_partition: "test_ordinal".into(),
159            items_per_section: NZU64!(1024),
160            write_buffer: NZUsize!(1024 * 1024),
161            replay_buffer: NZUsize!(1024 * 1024),
162            codec_config: (),
163        };
164        immutable::Archive::init(context, cfg).await.unwrap()
165    }
166
167    async fn test_put_get_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
168        let index = 1u64;
169        let key = test_key("testkey");
170        let data = 1;
171
172        // Has the key before put
173        let has = archive
174            .has(Identifier::Index(index))
175            .await
176            .expect("Failed to check key");
177        assert!(!has);
178        let has = archive
179            .has(Identifier::Key(&key))
180            .await
181            .expect("Failed to check key");
182        assert!(!has);
183
184        // Put the key-data pair
185        archive
186            .put(index, key.clone(), data)
187            .await
188            .expect("Failed to put data");
189
190        // Has the key after put
191        let has = archive
192            .has(Identifier::Index(index))
193            .await
194            .expect("Failed to check key");
195        assert!(has);
196        let has = archive
197            .has(Identifier::Key(&key))
198            .await
199            .expect("Failed to check key");
200        assert!(has);
201
202        // Get the data by key
203        let retrieved = archive
204            .get(Identifier::Key(&key))
205            .await
206            .expect("Failed to get data");
207        assert_eq!(retrieved, Some(data));
208
209        // Get the data by index
210        let retrieved = archive
211            .get(Identifier::Index(index))
212            .await
213            .expect("Failed to get data");
214        assert_eq!(retrieved, Some(data));
215
216        // Force a sync
217        archive.sync().await.expect("Failed to sync data");
218
219        // Close the archive
220        archive.close().await.expect("Failed to close archive");
221    }
222
223    #[test_traced]
224    fn test_put_get_prunable_no_compression() {
225        let executor = deterministic::Runner::default();
226        executor.start(|context| async move {
227            let archive = create_prunable(context, None).await;
228            test_put_get_impl(archive).await;
229        });
230    }
231
232    #[test_traced]
233    fn test_put_get_prunable_compression() {
234        let executor = deterministic::Runner::default();
235        executor.start(|context| async move {
236            let archive = create_prunable(context, Some(3)).await;
237            test_put_get_impl(archive).await;
238        });
239    }
240
241    #[test_traced]
242    fn test_put_get_immutable_no_compression() {
243        let executor = deterministic::Runner::default();
244        executor.start(|context| async move {
245            let archive = create_immutable(context, None).await;
246            test_put_get_impl(archive).await;
247        });
248    }
249
250    #[test_traced]
251    fn test_put_get_immutable_compression() {
252        let executor = deterministic::Runner::default();
253        executor.start(|context| async move {
254            let archive = create_immutable(context, Some(3)).await;
255            test_put_get_impl(archive).await;
256        });
257    }
258
259    async fn test_duplicate_key_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
260        let index = 1u64;
261        let key = test_key("duplicate");
262        let data1 = 1;
263        let data2 = 2;
264
265        // Put the key-data pair
266        archive
267            .put(index, key.clone(), data1)
268            .await
269            .expect("Failed to put data");
270
271        // Put the key-data pair again (should be idempotent)
272        archive
273            .put(index, key.clone(), data2)
274            .await
275            .expect("Duplicate put should not fail");
276
277        // Get the data back - should still be the first value
278        let retrieved = archive
279            .get(Identifier::Index(index))
280            .await
281            .expect("Failed to get data")
282            .expect("Data not found");
283        assert_eq!(retrieved, data1);
284
285        let retrieved = archive
286            .get(Identifier::Key(&key))
287            .await
288            .expect("Failed to get data")
289            .expect("Data not found");
290        assert_eq!(retrieved, data1);
291
292        archive.close().await.expect("Failed to close archive");
293    }
294
295    #[test_traced]
296    fn test_duplicate_key_prunable_no_compression() {
297        let executor = deterministic::Runner::default();
298        executor.start(|context| async move {
299            let archive = create_prunable(context, None).await;
300            test_duplicate_key_impl(archive).await;
301        });
302    }
303
304    #[test_traced]
305    fn test_duplicate_key_prunable_compression() {
306        let executor = deterministic::Runner::default();
307        executor.start(|context| async move {
308            let archive = create_prunable(context, Some(3)).await;
309            test_duplicate_key_impl(archive).await;
310        });
311    }
312
313    #[test_traced]
314    fn test_duplicate_key_immutable_no_compression() {
315        let executor = deterministic::Runner::default();
316        executor.start(|context| async move {
317            let archive = create_immutable(context, None).await;
318            test_duplicate_key_impl(archive).await;
319        });
320    }
321
322    #[test_traced]
323    fn test_duplicate_key_immutable_compression() {
324        let executor = deterministic::Runner::default();
325        executor.start(|context| async move {
326            let archive = create_immutable(context, Some(3)).await;
327            test_duplicate_key_impl(archive).await;
328        });
329    }
330
331    async fn test_get_nonexistent_impl(archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
332        // Attempt to get an index that doesn't exist
333        let index = 1u64;
334        let retrieved: Option<i32> = archive
335            .get(Identifier::Index(index))
336            .await
337            .expect("Failed to get data");
338        assert!(retrieved.is_none());
339
340        // Attempt to get a key that doesn't exist
341        let key = test_key("nonexistent");
342        let retrieved = archive
343            .get(Identifier::Key(&key))
344            .await
345            .expect("Failed to get data");
346        assert!(retrieved.is_none());
347
348        archive.close().await.expect("Failed to close archive");
349    }
350
351    #[test_traced]
352    fn test_get_nonexistent_prunable_no_compression() {
353        let executor = deterministic::Runner::default();
354        executor.start(|context| async move {
355            let archive = create_prunable(context, None).await;
356            test_get_nonexistent_impl(archive).await;
357        });
358    }
359
360    #[test_traced]
361    fn test_get_nonexistent_prunable_compression() {
362        let executor = deterministic::Runner::default();
363        executor.start(|context| async move {
364            let archive = create_prunable(context, Some(3)).await;
365            test_get_nonexistent_impl(archive).await;
366        });
367    }
368
369    #[test_traced]
370    fn test_get_nonexistent_immutable_no_compression() {
371        let executor = deterministic::Runner::default();
372        executor.start(|context| async move {
373            let archive = create_immutable(context, None).await;
374            test_get_nonexistent_impl(archive).await;
375        });
376    }
377
378    #[test_traced]
379    fn test_get_nonexistent_immutable_compression() {
380        let executor = deterministic::Runner::default();
381        executor.start(|context| async move {
382            let archive = create_immutable(context, Some(3)).await;
383            test_get_nonexistent_impl(archive).await;
384        });
385    }
386
387    async fn test_persistence_impl<A, F, Fut>(context: Context, creator: F, compression: Option<u8>)
388    where
389        A: Archive<Key = FixedBytes<64>, Value = i32>,
390        F: Fn(Context, Option<u8>) -> Fut,
391        Fut: Future<Output = A>,
392    {
393        // Create and populate archive
394        {
395            let mut archive = creator(context.clone(), compression).await;
396
397            // Insert multiple keys
398            let keys = vec![
399                (1u64, test_key("key1"), 1),
400                (2u64, test_key("key2"), 2),
401                (3u64, test_key("key3"), 3),
402            ];
403
404            for (index, key, data) in &keys {
405                archive
406                    .put(*index, key.clone(), *data)
407                    .await
408                    .expect("Failed to put data");
409            }
410
411            // Close the archive
412            archive.close().await.expect("Failed to close archive");
413        }
414
415        // Reopen and verify data
416        {
417            let archive = creator(context, compression).await;
418
419            // Verify all keys are still present
420            let keys = vec![
421                (1u64, test_key("key1"), 1),
422                (2u64, test_key("key2"), 2),
423                (3u64, test_key("key3"), 3),
424            ];
425
426            for (index, key, expected_data) in &keys {
427                let retrieved = archive
428                    .get(Identifier::Index(*index))
429                    .await
430                    .expect("Failed to get data")
431                    .expect("Data not found");
432                assert_eq!(retrieved, *expected_data);
433
434                let retrieved = archive
435                    .get(Identifier::Key(key))
436                    .await
437                    .expect("Failed to get data")
438                    .expect("Data not found");
439                assert_eq!(retrieved, *expected_data);
440            }
441
442            archive.close().await.expect("Failed to close archive");
443        }
444    }
445
446    #[test_traced]
447    fn test_persistence_prunable_no_compression() {
448        let executor = deterministic::Runner::default();
449        executor.start(|context| async move {
450            test_persistence_impl(context, create_prunable, None).await;
451        });
452    }
453
454    #[test_traced]
455    fn test_persistence_prunable_compression() {
456        let executor = deterministic::Runner::default();
457        executor.start(|context| async move {
458            test_persistence_impl(context, create_prunable, Some(3)).await;
459        });
460    }
461
462    #[test_traced]
463    fn test_persistence_immutable_no_compression() {
464        let executor = deterministic::Runner::default();
465        executor.start(|context| async move {
466            test_persistence_impl(context, create_immutable, None).await;
467        });
468    }
469
470    #[test_traced]
471    fn test_persistence_immutable_compression() {
472        let executor = deterministic::Runner::default();
473        executor.start(|context| async move {
474            test_persistence_impl(context, create_immutable, Some(3)).await;
475        });
476    }
477
478    async fn test_ranges_impl<A, F, Fut>(mut context: Context, creator: F, compression: Option<u8>)
479    where
480        A: Archive<Key = FixedBytes<64>, Value = i32>,
481        F: Fn(Context, Option<u8>) -> Fut,
482        Fut: Future<Output = A>,
483    {
484        let mut keys = BTreeMap::new();
485        {
486            let mut archive = creator(context.clone(), compression).await;
487
488            // Insert 100 keys with gaps
489            let mut last_index = 0u64;
490            while keys.len() < 100 {
491                let gap: u64 = context.gen_range(1..=10);
492                let index = last_index + gap;
493                last_index = index;
494
495                let mut key_bytes = [0u8; 64];
496                context.fill(&mut key_bytes);
497                let key = FixedBytes::<64>::decode(key_bytes.as_ref()).unwrap();
498                let data: i32 = context.gen();
499
500                if keys.contains_key(&index) {
501                    continue;
502                }
503                keys.insert(index, (key.clone(), data));
504
505                archive
506                    .put(index, key, data)
507                    .await
508                    .expect("Failed to put data");
509            }
510
511            archive.close().await.expect("Failed to close archive");
512        }
513
514        {
515            let archive = creator(context, compression).await;
516            let sorted_indices: Vec<u64> = keys.keys().cloned().collect();
517
518            // Check gap before the first element
519            let (current_end, start_next) = archive.next_gap(0);
520            assert!(current_end.is_none());
521            assert_eq!(start_next, Some(sorted_indices[0]));
522
523            // Check gaps between elements
524            let mut i = 0;
525            while i < sorted_indices.len() {
526                let current_index = sorted_indices[i];
527
528                // Find the end of the current contiguous block
529                let mut j = i;
530                while j + 1 < sorted_indices.len() && sorted_indices[j + 1] == sorted_indices[j] + 1
531                {
532                    j += 1;
533                }
534                let block_end_index = sorted_indices[j];
535                let next_actual_index = if j + 1 < sorted_indices.len() {
536                    Some(sorted_indices[j + 1])
537                } else {
538                    None
539                };
540
541                let (current_end, start_next) = archive.next_gap(current_index);
542                assert_eq!(current_end, Some(block_end_index));
543                assert_eq!(start_next, next_actual_index);
544
545                // If there's a gap, check an index within the gap
546                if let Some(next_index) = next_actual_index {
547                    if next_index > block_end_index + 1 {
548                        let in_gap_index = block_end_index + 1;
549                        let (current_end, start_next) = archive.next_gap(in_gap_index);
550                        assert!(current_end.is_none());
551                        assert_eq!(start_next, Some(next_index));
552                    }
553                }
554                i = j + 1;
555            }
556
557            // Check the last element
558            let last_index = *sorted_indices.last().unwrap();
559            let (current_end, start_next) = archive.next_gap(last_index);
560            assert!(current_end.is_some());
561            assert!(start_next.is_none());
562
563            archive.close().await.expect("Failed to close archive");
564        }
565    }
566
567    #[test_traced]
568    fn test_ranges_prunable_no_compression() {
569        let executor = deterministic::Runner::default();
570        executor.start(|context| async move {
571            test_ranges_impl(context, create_prunable, None).await;
572        });
573    }
574
575    #[test_traced]
576    fn test_ranges_prunable_compression() {
577        let executor = deterministic::Runner::default();
578        executor.start(|context| async move {
579            test_ranges_impl(context, create_prunable, Some(3)).await;
580        });
581    }
582
583    #[test_traced]
584    fn test_ranges_immutable_no_compression() {
585        let executor = deterministic::Runner::default();
586        executor.start(|context| async move {
587            test_ranges_impl(context, create_immutable, None).await;
588        });
589    }
590
591    #[test_traced]
592    fn test_ranges_immutable_compression() {
593        let executor = deterministic::Runner::default();
594        executor.start(|context| async move {
595            test_ranges_impl(context, create_immutable, Some(3)).await;
596        });
597    }
598
599    async fn test_many_keys_impl<A, F, Fut>(
600        mut context: Context,
601        creator: F,
602        compression: Option<u8>,
603        num: usize,
604    ) where
605        A: Archive<Key = FixedBytes<64>, Value = i32>,
606        F: Fn(Context, Option<u8>) -> Fut,
607        Fut: Future<Output = A>,
608    {
609        // Insert many keys
610        let mut keys = BTreeMap::new();
611        {
612            let mut archive = creator(context.clone(), compression).await;
613            while keys.len() < num {
614                let index = keys.len() as u64;
615                let mut key = [0u8; 64];
616                context.fill(&mut key);
617                let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
618                let data: i32 = context.gen();
619
620                archive
621                    .put(index, key.clone(), data)
622                    .await
623                    .expect("Failed to put data");
624                keys.insert(key, (index, data));
625
626                // Randomly sync the archive
627                if context.gen_bool(0.1) {
628                    archive.sync().await.expect("Failed to sync archive");
629                }
630            }
631            archive.sync().await.expect("Failed to sync archive");
632
633            // Ensure all keys can be retrieved
634            for (key, (index, data)) in &keys {
635                let retrieved = archive
636                    .get(Identifier::Index(*index))
637                    .await
638                    .expect("Failed to get data")
639                    .expect("Data not found");
640                assert_eq!(&retrieved, data);
641                let retrieved = archive
642                    .get(Identifier::Key(key))
643                    .await
644                    .expect("Failed to get data")
645                    .expect("Data not found");
646                assert_eq!(&retrieved, data);
647            }
648
649            archive.close().await.expect("Failed to close archive");
650        }
651
652        // Reinitialize and verify
653        {
654            let archive = creator(context.clone(), compression).await;
655
656            // Ensure all keys can be retrieved
657            for (key, (index, data)) in &keys {
658                let retrieved = archive
659                    .get(Identifier::Index(*index))
660                    .await
661                    .expect("Failed to get data")
662                    .expect("Data not found");
663                assert_eq!(&retrieved, data);
664                let retrieved = archive
665                    .get(Identifier::Key(key))
666                    .await
667                    .expect("Failed to get data")
668                    .expect("Data not found");
669                assert_eq!(&retrieved, data);
670            }
671
672            archive.close().await.expect("Failed to close archive");
673        }
674    }
675
676    fn test_many_keys_determinism<F, Fut, A>(creator: F, compression: Option<u8>, num: usize)
677    where
678        A: Archive<Key = FixedBytes<64>, Value = i32>,
679        F: Fn(Context, Option<u8>) -> Fut + Copy + Send + 'static,
680        Fut: Future<Output = A> + Send,
681    {
682        let executor = deterministic::Runner::default();
683        let state1 = executor.start(|context| async move {
684            test_many_keys_impl(context.clone(), creator, compression, num).await;
685            context.auditor().state()
686        });
687        let executor = deterministic::Runner::default();
688        let state2 = executor.start(|context| async move {
689            test_many_keys_impl(context.clone(), creator, compression, num).await;
690            context.auditor().state()
691        });
692        assert_eq!(state1, state2);
693    }
694
695    #[test_traced]
696    fn test_many_keys_prunable_no_compression() {
697        test_many_keys_determinism(create_prunable, None, 1_000);
698    }
699
700    #[test_traced]
701    fn test_many_keys_prunable_compression() {
702        test_many_keys_determinism(create_prunable, Some(3), 1_000);
703    }
704
705    #[test_traced]
706    fn test_many_keys_immutable_no_compression() {
707        test_many_keys_determinism(create_immutable, None, 1_000);
708    }
709
710    #[test_traced]
711    fn test_many_keys_immutable_compression() {
712        test_many_keys_determinism(create_immutable, Some(3), 1_000);
713    }
714
715    #[test_traced]
716    #[ignore]
717    fn test_many_keys_prunable_large() {
718        test_many_keys_determinism(create_prunable, None, 50_000);
719    }
720
721    #[test_traced]
722    #[ignore]
723    fn test_many_keys_immutable_large() {
724        test_many_keys_determinism(create_immutable, None, 50_000);
725    }
726}