commonware_storage/archive/
mod.rs

1//! A write-once key-value store for ordered data.
2//!
3//! [Archive] is a key-value store designed for workloads where all data is written only once and is
4//! uniquely associated with both an `index` and a `key`.
5
6use commonware_codec::Codec;
7use commonware_utils::Array;
8use std::future::Future;
9use thiserror::Error;
10
11pub mod immutable;
12pub mod prunable;
13
14/// Subject of a `get` or `has` operation.
15pub enum Identifier<'a, K: Array> {
16    Index(u64),
17    Key(&'a K),
18}
19
20/// Errors that can occur when interacting with the archive.
21#[derive(Debug, Error)]
22pub enum Error {
23    #[error("journal error: {0}")]
24    Journal(#[from] crate::journal::Error),
25    #[error("ordinal error: {0}")]
26    Ordinal(#[from] crate::ordinal::Error),
27    #[error("metadata error: {0}")]
28    Metadata(#[from] crate::metadata::Error),
29    #[error("freezer error: {0}")]
30    Freezer(#[from] crate::freezer::Error),
31    #[error("record corrupted")]
32    RecordCorrupted,
33    #[error("already pruned to: {0}")]
34    AlreadyPrunedTo(u64),
35    #[error("record too large")]
36    RecordTooLarge,
37}
38
39/// A write-once key-value store where each key is associated with a unique index.
40pub trait Archive {
41    /// The type of the key.
42    type Key: Array;
43
44    /// The type of the value.
45    type Value: Codec;
46
47    /// Store an item in [Archive]. Both indices and keys are assumed to both be globally unique.
48    ///
49    /// If the index already exists, put does nothing and returns. If the same key is stored multiple times
50    /// at different indices (not recommended), any value associated with the key may be returned.
51    fn put(
52        &mut self,
53        index: u64,
54        key: Self::Key,
55        value: Self::Value,
56    ) -> impl Future<Output = Result<(), Error>>;
57
58    /// Perform a [Archive::put] and [Archive::sync] in a single operation.
59    fn put_sync(
60        &mut self,
61        index: u64,
62        key: Self::Key,
63        value: Self::Value,
64    ) -> impl Future<Output = Result<(), Error>> {
65        async move {
66            self.put(index, key, value).await?;
67            self.sync().await
68        }
69    }
70
71    /// Retrieve an item from [Archive].
72    fn get(
73        &self,
74        identifier: Identifier<'_, Self::Key>,
75    ) -> impl Future<Output = Result<Option<Self::Value>, Error>>;
76
77    /// Check if an item exists in [Archive].
78    fn has(
79        &self,
80        identifier: Identifier<'_, Self::Key>,
81    ) -> impl Future<Output = Result<bool, Error>>;
82
83    /// Retrieve the end of the current range including `index` (inclusive) and
84    /// the start of the next range after `index` (if it exists).
85    ///
86    /// This is useful for driving backfill operations over the archive.
87    fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>);
88
89    /// Returns up to `max` missing items starting from `start`.
90    ///
91    /// This method iterates through gaps between existing ranges, collecting missing indices
92    /// until either `max` items are found or there are no more gaps to fill.
93    fn missing_items(&self, index: u64, max: usize) -> Vec<u64>;
94
95    /// Retrieve an iterator over all populated ranges (inclusive) within the [Archive].
96    fn ranges(&self) -> impl Iterator<Item = (u64, u64)>;
97
98    /// Retrieve the first index in the [Archive].
99    fn first_index(&self) -> Option<u64>;
100
101    /// Retrieve the last index in the [Archive].
102    fn last_index(&self) -> Option<u64>;
103
104    /// Sync all pending writes.
105    fn sync(&mut self) -> impl Future<Output = Result<(), Error>>;
106
107    /// Close [Archive] (and underlying storage).
108    ///
109    /// Any pending writes are synced prior to closing.
110    fn close(self) -> impl Future<Output = Result<(), Error>>;
111
112    /// Remove all persistent data created by this [Archive].
113    fn destroy(self) -> impl Future<Output = Result<(), Error>>;
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119    use crate::translator::TwoCap;
120    use commonware_codec::DecodeExt;
121    use commonware_macros::{test_group, test_traced};
122    use commonware_runtime::{
123        buffer::PoolRef,
124        deterministic::{self, Context},
125        Runner,
126    };
127    use commonware_utils::{sequence::FixedBytes, NZUsize, NZU64};
128    use rand::Rng;
129    use std::{collections::BTreeMap, num::NonZeroUsize};
130
131    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
132    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
133
134    fn test_key(key: &str) -> FixedBytes<64> {
135        let mut buf = [0u8; 64];
136        let key = key.as_bytes();
137        assert!(key.len() <= buf.len());
138        buf[..key.len()].copy_from_slice(key);
139        FixedBytes::decode(buf.as_ref()).unwrap()
140    }
141
142    async fn create_prunable(
143        context: Context,
144        compression: Option<u8>,
145    ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
146        let cfg = prunable::Config {
147            partition: "test".into(),
148            translator: TwoCap,
149            compression,
150            codec_config: (),
151            items_per_section: NZU64!(1024),
152            write_buffer: NZUsize!(1024),
153            replay_buffer: NZUsize!(1024),
154            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
155        };
156        prunable::Archive::init(context, cfg).await.unwrap()
157    }
158
159    async fn create_immutable(
160        context: Context,
161        compression: Option<u8>,
162    ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
163        let cfg = immutable::Config {
164            metadata_partition: "test_metadata".into(),
165            freezer_table_partition: "test_table".into(),
166            freezer_table_initial_size: 64,
167            freezer_table_resize_frequency: 2,
168            freezer_table_resize_chunk_size: 32,
169            freezer_journal_partition: "test_journal".into(),
170            freezer_journal_target_size: 1024 * 1024,
171            freezer_journal_compression: compression,
172            freezer_journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
173            ordinal_partition: "test_ordinal".into(),
174            items_per_section: NZU64!(1024),
175            write_buffer: NZUsize!(1024 * 1024),
176            replay_buffer: NZUsize!(1024 * 1024),
177            codec_config: (),
178        };
179        immutable::Archive::init(context, cfg).await.unwrap()
180    }
181
182    async fn test_put_get_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
183        let index = 1u64;
184        let key = test_key("testkey");
185        let data = 1;
186
187        // Has the key before put
188        let has = archive
189            .has(Identifier::Index(index))
190            .await
191            .expect("Failed to check key");
192        assert!(!has);
193        let has = archive
194            .has(Identifier::Key(&key))
195            .await
196            .expect("Failed to check key");
197        assert!(!has);
198
199        // Put the key-data pair
200        archive
201            .put(index, key.clone(), data)
202            .await
203            .expect("Failed to put data");
204
205        // Has the key after put
206        let has = archive
207            .has(Identifier::Index(index))
208            .await
209            .expect("Failed to check key");
210        assert!(has);
211        let has = archive
212            .has(Identifier::Key(&key))
213            .await
214            .expect("Failed to check key");
215        assert!(has);
216
217        // Get the data by key
218        let retrieved = archive
219            .get(Identifier::Key(&key))
220            .await
221            .expect("Failed to get data");
222        assert_eq!(retrieved, Some(data));
223
224        // Get the data by index
225        let retrieved = archive
226            .get(Identifier::Index(index))
227            .await
228            .expect("Failed to get data");
229        assert_eq!(retrieved, Some(data));
230
231        // Force a sync
232        archive.sync().await.expect("Failed to sync data");
233
234        // Close the archive
235        archive.close().await.expect("Failed to close archive");
236    }
237
238    #[test_traced]
239    fn test_put_get_prunable_no_compression() {
240        let executor = deterministic::Runner::default();
241        executor.start(|context| async move {
242            let archive = create_prunable(context, None).await;
243            test_put_get_impl(archive).await;
244        });
245    }
246
247    #[test_traced]
248    fn test_put_get_prunable_compression() {
249        let executor = deterministic::Runner::default();
250        executor.start(|context| async move {
251            let archive = create_prunable(context, Some(3)).await;
252            test_put_get_impl(archive).await;
253        });
254    }
255
256    #[test_traced]
257    fn test_put_get_immutable_no_compression() {
258        let executor = deterministic::Runner::default();
259        executor.start(|context| async move {
260            let archive = create_immutable(context, None).await;
261            test_put_get_impl(archive).await;
262        });
263    }
264
265    #[test_traced]
266    fn test_put_get_immutable_compression() {
267        let executor = deterministic::Runner::default();
268        executor.start(|context| async move {
269            let archive = create_immutable(context, Some(3)).await;
270            test_put_get_impl(archive).await;
271        });
272    }
273
274    async fn test_duplicate_key_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
275        let index = 1u64;
276        let key = test_key("duplicate");
277        let data1 = 1;
278        let data2 = 2;
279
280        // Put the key-data pair
281        archive
282            .put(index, key.clone(), data1)
283            .await
284            .expect("Failed to put data");
285
286        // Put the key-data pair again (should be idempotent)
287        archive
288            .put(index, key.clone(), data2)
289            .await
290            .expect("Duplicate put should not fail");
291
292        // Get the data back - should still be the first value
293        let retrieved = archive
294            .get(Identifier::Index(index))
295            .await
296            .expect("Failed to get data")
297            .expect("Data not found");
298        assert_eq!(retrieved, data1);
299
300        let retrieved = archive
301            .get(Identifier::Key(&key))
302            .await
303            .expect("Failed to get data")
304            .expect("Data not found");
305        assert_eq!(retrieved, data1);
306
307        archive.close().await.expect("Failed to close archive");
308    }
309
310    #[test_traced]
311    fn test_duplicate_key_prunable_no_compression() {
312        let executor = deterministic::Runner::default();
313        executor.start(|context| async move {
314            let archive = create_prunable(context, None).await;
315            test_duplicate_key_impl(archive).await;
316        });
317    }
318
319    #[test_traced]
320    fn test_duplicate_key_prunable_compression() {
321        let executor = deterministic::Runner::default();
322        executor.start(|context| async move {
323            let archive = create_prunable(context, Some(3)).await;
324            test_duplicate_key_impl(archive).await;
325        });
326    }
327
328    #[test_traced]
329    fn test_duplicate_key_immutable_no_compression() {
330        let executor = deterministic::Runner::default();
331        executor.start(|context| async move {
332            let archive = create_immutable(context, None).await;
333            test_duplicate_key_impl(archive).await;
334        });
335    }
336
337    #[test_traced]
338    fn test_duplicate_key_immutable_compression() {
339        let executor = deterministic::Runner::default();
340        executor.start(|context| async move {
341            let archive = create_immutable(context, Some(3)).await;
342            test_duplicate_key_impl(archive).await;
343        });
344    }
345
346    async fn test_get_nonexistent_impl(archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
347        // Attempt to get an index that doesn't exist
348        let index = 1u64;
349        let retrieved: Option<i32> = archive
350            .get(Identifier::Index(index))
351            .await
352            .expect("Failed to get data");
353        assert!(retrieved.is_none());
354
355        // Attempt to get a key that doesn't exist
356        let key = test_key("nonexistent");
357        let retrieved = archive
358            .get(Identifier::Key(&key))
359            .await
360            .expect("Failed to get data");
361        assert!(retrieved.is_none());
362
363        archive.close().await.expect("Failed to close archive");
364    }
365
366    #[test_traced]
367    fn test_get_nonexistent_prunable_no_compression() {
368        let executor = deterministic::Runner::default();
369        executor.start(|context| async move {
370            let archive = create_prunable(context, None).await;
371            test_get_nonexistent_impl(archive).await;
372        });
373    }
374
375    #[test_traced]
376    fn test_get_nonexistent_prunable_compression() {
377        let executor = deterministic::Runner::default();
378        executor.start(|context| async move {
379            let archive = create_prunable(context, Some(3)).await;
380            test_get_nonexistent_impl(archive).await;
381        });
382    }
383
384    #[test_traced]
385    fn test_get_nonexistent_immutable_no_compression() {
386        let executor = deterministic::Runner::default();
387        executor.start(|context| async move {
388            let archive = create_immutable(context, None).await;
389            test_get_nonexistent_impl(archive).await;
390        });
391    }
392
393    #[test_traced]
394    fn test_get_nonexistent_immutable_compression() {
395        let executor = deterministic::Runner::default();
396        executor.start(|context| async move {
397            let archive = create_immutable(context, Some(3)).await;
398            test_get_nonexistent_impl(archive).await;
399        });
400    }
401
402    async fn test_persistence_impl<A, F, Fut>(context: Context, creator: F, compression: Option<u8>)
403    where
404        A: Archive<Key = FixedBytes<64>, Value = i32>,
405        F: Fn(Context, Option<u8>) -> Fut,
406        Fut: Future<Output = A>,
407    {
408        // Create and populate archive
409        {
410            let mut archive = creator(context.clone(), compression).await;
411
412            // Insert multiple keys
413            let keys = vec![
414                (1u64, test_key("key1"), 1),
415                (2u64, test_key("key2"), 2),
416                (3u64, test_key("key3"), 3),
417            ];
418
419            for (index, key, data) in &keys {
420                archive
421                    .put(*index, key.clone(), *data)
422                    .await
423                    .expect("Failed to put data");
424            }
425
426            // Close the archive
427            archive.close().await.expect("Failed to close archive");
428        }
429
430        // Reopen and verify data
431        {
432            let archive = creator(context, compression).await;
433
434            // Verify all keys are still present
435            let keys = vec![
436                (1u64, test_key("key1"), 1),
437                (2u64, test_key("key2"), 2),
438                (3u64, test_key("key3"), 3),
439            ];
440
441            for (index, key, expected_data) in &keys {
442                let retrieved = archive
443                    .get(Identifier::Index(*index))
444                    .await
445                    .expect("Failed to get data")
446                    .expect("Data not found");
447                assert_eq!(retrieved, *expected_data);
448
449                let retrieved = archive
450                    .get(Identifier::Key(key))
451                    .await
452                    .expect("Failed to get data")
453                    .expect("Data not found");
454                assert_eq!(retrieved, *expected_data);
455            }
456
457            archive.close().await.expect("Failed to close archive");
458        }
459    }
460
461    #[test_traced]
462    fn test_persistence_prunable_no_compression() {
463        let executor = deterministic::Runner::default();
464        executor.start(|context| async move {
465            test_persistence_impl(context, create_prunable, None).await;
466        });
467    }
468
469    #[test_traced]
470    fn test_persistence_prunable_compression() {
471        let executor = deterministic::Runner::default();
472        executor.start(|context| async move {
473            test_persistence_impl(context, create_prunable, Some(3)).await;
474        });
475    }
476
477    #[test_traced]
478    fn test_persistence_immutable_no_compression() {
479        let executor = deterministic::Runner::default();
480        executor.start(|context| async move {
481            test_persistence_impl(context, create_immutable, None).await;
482        });
483    }
484
485    #[test_traced]
486    fn test_persistence_immutable_compression() {
487        let executor = deterministic::Runner::default();
488        executor.start(|context| async move {
489            test_persistence_impl(context, create_immutable, Some(3)).await;
490        });
491    }
492
493    async fn test_ranges_impl<A, F, Fut>(mut context: Context, creator: F, compression: Option<u8>)
494    where
495        A: Archive<Key = FixedBytes<64>, Value = i32>,
496        F: Fn(Context, Option<u8>) -> Fut,
497        Fut: Future<Output = A>,
498    {
499        let mut keys = BTreeMap::new();
500        {
501            let mut archive = creator(context.clone(), compression).await;
502
503            // Insert 100 keys with gaps
504            let mut last_index = 0u64;
505            while keys.len() < 100 {
506                let gap: u64 = context.gen_range(1..=10);
507                let index = last_index + gap;
508                last_index = index;
509
510                let mut key_bytes = [0u8; 64];
511                context.fill(&mut key_bytes);
512                let key = FixedBytes::<64>::decode(key_bytes.as_ref()).unwrap();
513                let data: i32 = context.gen();
514
515                if keys.contains_key(&index) {
516                    continue;
517                }
518                keys.insert(index, (key.clone(), data));
519
520                archive
521                    .put(index, key, data)
522                    .await
523                    .expect("Failed to put data");
524            }
525
526            archive.close().await.expect("Failed to close archive");
527        }
528
529        {
530            let archive = creator(context, compression).await;
531            let sorted_indices: Vec<u64> = keys.keys().cloned().collect();
532
533            // Check gap before the first element
534            let (current_end, start_next) = archive.next_gap(0);
535            assert!(current_end.is_none());
536            assert_eq!(start_next, Some(sorted_indices[0]));
537
538            // Check gaps between elements
539            let mut i = 0;
540            while i < sorted_indices.len() {
541                let current_index = sorted_indices[i];
542
543                // Find the end of the current contiguous block
544                let mut j = i;
545                while j + 1 < sorted_indices.len() && sorted_indices[j + 1] == sorted_indices[j] + 1
546                {
547                    j += 1;
548                }
549                let block_end_index = sorted_indices[j];
550                let next_actual_index = if j + 1 < sorted_indices.len() {
551                    Some(sorted_indices[j + 1])
552                } else {
553                    None
554                };
555
556                let (current_end, start_next) = archive.next_gap(current_index);
557                assert_eq!(current_end, Some(block_end_index));
558                assert_eq!(start_next, next_actual_index);
559
560                // If there's a gap, check an index within the gap
561                if let Some(next_index) = next_actual_index {
562                    if next_index > block_end_index + 1 {
563                        let in_gap_index = block_end_index + 1;
564                        let (current_end, start_next) = archive.next_gap(in_gap_index);
565                        assert!(current_end.is_none());
566                        assert_eq!(start_next, Some(next_index));
567                    }
568                }
569                i = j + 1;
570            }
571
572            // Check the last element
573            let last_index = *sorted_indices.last().unwrap();
574            let (current_end, start_next) = archive.next_gap(last_index);
575            assert!(current_end.is_some());
576            assert!(start_next.is_none());
577
578            archive.close().await.expect("Failed to close archive");
579        }
580    }
581
582    #[test_traced]
583    fn test_ranges_prunable_no_compression() {
584        let executor = deterministic::Runner::default();
585        executor.start(|context| async move {
586            test_ranges_impl(context, create_prunable, None).await;
587        });
588    }
589
590    #[test_traced]
591    fn test_ranges_prunable_compression() {
592        let executor = deterministic::Runner::default();
593        executor.start(|context| async move {
594            test_ranges_impl(context, create_prunable, Some(3)).await;
595        });
596    }
597
598    #[test_traced]
599    fn test_ranges_immutable_no_compression() {
600        let executor = deterministic::Runner::default();
601        executor.start(|context| async move {
602            test_ranges_impl(context, create_immutable, None).await;
603        });
604    }
605
606    #[test_traced]
607    fn test_ranges_immutable_compression() {
608        let executor = deterministic::Runner::default();
609        executor.start(|context| async move {
610            test_ranges_impl(context, create_immutable, Some(3)).await;
611        });
612    }
613
614    async fn test_many_keys_impl<A, F, Fut>(
615        mut context: Context,
616        creator: F,
617        compression: Option<u8>,
618        num: usize,
619    ) where
620        A: Archive<Key = FixedBytes<64>, Value = i32>,
621        F: Fn(Context, Option<u8>) -> Fut,
622        Fut: Future<Output = A>,
623    {
624        // Insert many keys
625        let mut keys = BTreeMap::new();
626        {
627            let mut archive = creator(context.clone(), compression).await;
628            while keys.len() < num {
629                let index = keys.len() as u64;
630                let mut key = [0u8; 64];
631                context.fill(&mut key);
632                let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
633                let data: i32 = context.gen();
634
635                archive
636                    .put(index, key.clone(), data)
637                    .await
638                    .expect("Failed to put data");
639                keys.insert(key, (index, data));
640
641                // Randomly sync the archive
642                if context.gen_bool(0.1) {
643                    archive.sync().await.expect("Failed to sync archive");
644                }
645            }
646            archive.sync().await.expect("Failed to sync archive");
647
648            // Ensure all keys can be retrieved
649            for (key, (index, data)) in &keys {
650                let retrieved = archive
651                    .get(Identifier::Index(*index))
652                    .await
653                    .expect("Failed to get data")
654                    .expect("Data not found");
655                assert_eq!(&retrieved, data);
656                let retrieved = archive
657                    .get(Identifier::Key(key))
658                    .await
659                    .expect("Failed to get data")
660                    .expect("Data not found");
661                assert_eq!(&retrieved, data);
662            }
663
664            archive.close().await.expect("Failed to close archive");
665        }
666
667        // Reinitialize and verify
668        {
669            let archive = creator(context.clone(), compression).await;
670
671            // Ensure all keys can be retrieved
672            for (key, (index, data)) in &keys {
673                let retrieved = archive
674                    .get(Identifier::Index(*index))
675                    .await
676                    .expect("Failed to get data")
677                    .expect("Data not found");
678                assert_eq!(&retrieved, data);
679                let retrieved = archive
680                    .get(Identifier::Key(key))
681                    .await
682                    .expect("Failed to get data")
683                    .expect("Data not found");
684                assert_eq!(&retrieved, data);
685            }
686
687            archive.close().await.expect("Failed to close archive");
688        }
689    }
690
691    fn test_many_keys_determinism<F, Fut, A>(creator: F, compression: Option<u8>, num: usize)
692    where
693        A: Archive<Key = FixedBytes<64>, Value = i32>,
694        F: Fn(Context, Option<u8>) -> Fut + Copy + Send + 'static,
695        Fut: Future<Output = A> + Send,
696    {
697        let executor = deterministic::Runner::default();
698        let state1 = executor.start(|context| async move {
699            test_many_keys_impl(context.clone(), creator, compression, num).await;
700            context.auditor().state()
701        });
702        let executor = deterministic::Runner::default();
703        let state2 = executor.start(|context| async move {
704            test_many_keys_impl(context.clone(), creator, compression, num).await;
705            context.auditor().state()
706        });
707        assert_eq!(state1, state2);
708    }
709
710    #[test_traced]
711    fn test_many_keys_prunable_no_compression() {
712        test_many_keys_determinism(create_prunable, None, 1_000);
713    }
714
715    #[test_traced]
716    fn test_many_keys_prunable_compression() {
717        test_many_keys_determinism(create_prunable, Some(3), 1_000);
718    }
719
720    #[test_traced]
721    fn test_many_keys_immutable_no_compression() {
722        test_many_keys_determinism(create_immutable, None, 1_000);
723    }
724
725    #[test_traced]
726    fn test_many_keys_immutable_compression() {
727        test_many_keys_determinism(create_immutable, Some(3), 1_000);
728    }
729
730    #[test_group("slow")]
731    #[test_traced]
732    fn test_many_keys_prunable_large() {
733        test_many_keys_determinism(create_prunable, None, 50_000);
734    }
735
736    #[test_group("slow")]
737    #[test_traced]
738    fn test_many_keys_immutable_large() {
739        test_many_keys_determinism(create_immutable, None, 50_000);
740    }
741}