commonware_storage/archive/
mod.rs

1//! A write-once key-value store for ordered data.
2//!
3//! [Archive] is a key-value store designed for workloads where all data is written only once and is
4//! uniquely associated with both an `index` and a `key`.
5
6use commonware_codec::Codec;
7use commonware_utils::Array;
8use std::future::Future;
9use thiserror::Error;
10
11pub mod immutable;
12pub mod prunable;
13
14/// Subject of a `get` or `has` operation.
15pub enum Identifier<'a, K: Array> {
16    Index(u64),
17    Key(&'a K),
18}
19
20/// Errors that can occur when interacting with the archive.
21#[derive(Debug, Error)]
22pub enum Error {
23    #[error("journal error: {0}")]
24    Journal(#[from] crate::journal::Error),
25    #[error("ordinal error: {0}")]
26    Ordinal(#[from] crate::ordinal::Error),
27    #[error("metadata error: {0}")]
28    Metadata(#[from] crate::metadata::Error),
29    #[error("freezer error: {0}")]
30    Freezer(#[from] crate::freezer::Error),
31    #[error("record corrupted")]
32    RecordCorrupted,
33    #[error("already pruned to: {0}")]
34    AlreadyPrunedTo(u64),
35    #[error("record too large")]
36    RecordTooLarge,
37}
38
39/// A write-once key-value store where each key is associated with a unique index.
40pub trait Archive {
41    /// The type of the key.
42    type Key: Array;
43
44    /// The type of the value.
45    type Value: Codec;
46
47    /// Store an item in [Archive]. Both indices and keys are assumed to both be globally unique.
48    ///
49    /// If the index already exists, put does nothing and returns. If the same key is stored multiple times
50    /// at different indices (not recommended), any value associated with the key may be returned.
51    fn put(
52        &mut self,
53        index: u64,
54        key: Self::Key,
55        value: Self::Value,
56    ) -> impl Future<Output = Result<(), Error>>;
57
58    /// Perform a [Archive::put] and [Archive::sync] in a single operation.
59    fn put_sync(
60        &mut self,
61        index: u64,
62        key: Self::Key,
63        value: Self::Value,
64    ) -> impl Future<Output = Result<(), Error>> {
65        async move {
66            self.put(index, key, value).await?;
67            self.sync().await
68        }
69    }
70
71    /// Retrieve an item from [Archive].
72    fn get(
73        &self,
74        identifier: Identifier<'_, Self::Key>,
75    ) -> impl Future<Output = Result<Option<Self::Value>, Error>>;
76
77    /// Check if an item exists in [Archive].
78    fn has(
79        &self,
80        identifier: Identifier<'_, Self::Key>,
81    ) -> impl Future<Output = Result<bool, Error>>;
82
83    /// Retrieve the end of the current range including `index` (inclusive) and
84    /// the start of the next range after `index` (if it exists).
85    ///
86    /// This is useful for driving backfill operations over the archive.
87    fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>);
88
89    /// Retrieve the first index in the [Archive].
90    fn first_index(&self) -> Option<u64>;
91
92    /// Retrieve the last index in the [Archive].
93    fn last_index(&self) -> Option<u64>;
94
95    /// Sync all pending writes.
96    fn sync(&mut self) -> impl Future<Output = Result<(), Error>>;
97
98    /// Close [Archive] (and underlying storage).
99    ///
100    /// Any pending writes are synced prior to closing.
101    fn close(self) -> impl Future<Output = Result<(), Error>>;
102
103    /// Remove all persistent data created by this [Archive].
104    fn destroy(self) -> impl Future<Output = Result<(), Error>>;
105}
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110    use crate::translator::TwoCap;
111    use commonware_codec::DecodeExt;
112    use commonware_macros::test_traced;
113    use commonware_runtime::{
114        buffer::PoolRef,
115        deterministic::{self, Context},
116        Runner,
117    };
118    use commonware_utils::{sequence::FixedBytes, NZUsize, NZU64};
119    use rand::Rng;
120    use std::{collections::BTreeMap, num::NonZeroUsize};
121
122    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
123    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
124
125    fn test_key(key: &str) -> FixedBytes<64> {
126        let mut buf = [0u8; 64];
127        let key = key.as_bytes();
128        assert!(key.len() <= buf.len());
129        buf[..key.len()].copy_from_slice(key);
130        FixedBytes::decode(buf.as_ref()).unwrap()
131    }
132
133    async fn create_prunable(
134        context: Context,
135        compression: Option<u8>,
136    ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
137        let cfg = prunable::Config {
138            partition: "test".into(),
139            translator: TwoCap,
140            compression,
141            codec_config: (),
142            items_per_section: NZU64!(1024),
143            write_buffer: NZUsize!(1024),
144            replay_buffer: NZUsize!(1024),
145            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
146        };
147        prunable::Archive::init(context, cfg).await.unwrap()
148    }
149
150    async fn create_immutable(
151        context: Context,
152        compression: Option<u8>,
153    ) -> impl Archive<Key = FixedBytes<64>, Value = i32> {
154        let cfg = immutable::Config {
155            metadata_partition: "test_metadata".into(),
156            freezer_table_partition: "test_table".into(),
157            freezer_table_initial_size: 64,
158            freezer_table_resize_frequency: 2,
159            freezer_table_resize_chunk_size: 32,
160            freezer_journal_partition: "test_journal".into(),
161            freezer_journal_target_size: 1024 * 1024,
162            freezer_journal_compression: compression,
163            freezer_journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
164            ordinal_partition: "test_ordinal".into(),
165            items_per_section: NZU64!(1024),
166            write_buffer: NZUsize!(1024 * 1024),
167            replay_buffer: NZUsize!(1024 * 1024),
168            codec_config: (),
169        };
170        immutable::Archive::init(context, cfg).await.unwrap()
171    }
172
173    async fn test_put_get_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
174        let index = 1u64;
175        let key = test_key("testkey");
176        let data = 1;
177
178        // Has the key before put
179        let has = archive
180            .has(Identifier::Index(index))
181            .await
182            .expect("Failed to check key");
183        assert!(!has);
184        let has = archive
185            .has(Identifier::Key(&key))
186            .await
187            .expect("Failed to check key");
188        assert!(!has);
189
190        // Put the key-data pair
191        archive
192            .put(index, key.clone(), data)
193            .await
194            .expect("Failed to put data");
195
196        // Has the key after put
197        let has = archive
198            .has(Identifier::Index(index))
199            .await
200            .expect("Failed to check key");
201        assert!(has);
202        let has = archive
203            .has(Identifier::Key(&key))
204            .await
205            .expect("Failed to check key");
206        assert!(has);
207
208        // Get the data by key
209        let retrieved = archive
210            .get(Identifier::Key(&key))
211            .await
212            .expect("Failed to get data");
213        assert_eq!(retrieved, Some(data));
214
215        // Get the data by index
216        let retrieved = archive
217            .get(Identifier::Index(index))
218            .await
219            .expect("Failed to get data");
220        assert_eq!(retrieved, Some(data));
221
222        // Force a sync
223        archive.sync().await.expect("Failed to sync data");
224
225        // Close the archive
226        archive.close().await.expect("Failed to close archive");
227    }
228
229    #[test_traced]
230    fn test_put_get_prunable_no_compression() {
231        let executor = deterministic::Runner::default();
232        executor.start(|context| async move {
233            let archive = create_prunable(context, None).await;
234            test_put_get_impl(archive).await;
235        });
236    }
237
238    #[test_traced]
239    fn test_put_get_prunable_compression() {
240        let executor = deterministic::Runner::default();
241        executor.start(|context| async move {
242            let archive = create_prunable(context, Some(3)).await;
243            test_put_get_impl(archive).await;
244        });
245    }
246
247    #[test_traced]
248    fn test_put_get_immutable_no_compression() {
249        let executor = deterministic::Runner::default();
250        executor.start(|context| async move {
251            let archive = create_immutable(context, None).await;
252            test_put_get_impl(archive).await;
253        });
254    }
255
256    #[test_traced]
257    fn test_put_get_immutable_compression() {
258        let executor = deterministic::Runner::default();
259        executor.start(|context| async move {
260            let archive = create_immutable(context, Some(3)).await;
261            test_put_get_impl(archive).await;
262        });
263    }
264
265    async fn test_duplicate_key_impl(mut archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
266        let index = 1u64;
267        let key = test_key("duplicate");
268        let data1 = 1;
269        let data2 = 2;
270
271        // Put the key-data pair
272        archive
273            .put(index, key.clone(), data1)
274            .await
275            .expect("Failed to put data");
276
277        // Put the key-data pair again (should be idempotent)
278        archive
279            .put(index, key.clone(), data2)
280            .await
281            .expect("Duplicate put should not fail");
282
283        // Get the data back - should still be the first value
284        let retrieved = archive
285            .get(Identifier::Index(index))
286            .await
287            .expect("Failed to get data")
288            .expect("Data not found");
289        assert_eq!(retrieved, data1);
290
291        let retrieved = archive
292            .get(Identifier::Key(&key))
293            .await
294            .expect("Failed to get data")
295            .expect("Data not found");
296        assert_eq!(retrieved, data1);
297
298        archive.close().await.expect("Failed to close archive");
299    }
300
301    #[test_traced]
302    fn test_duplicate_key_prunable_no_compression() {
303        let executor = deterministic::Runner::default();
304        executor.start(|context| async move {
305            let archive = create_prunable(context, None).await;
306            test_duplicate_key_impl(archive).await;
307        });
308    }
309
310    #[test_traced]
311    fn test_duplicate_key_prunable_compression() {
312        let executor = deterministic::Runner::default();
313        executor.start(|context| async move {
314            let archive = create_prunable(context, Some(3)).await;
315            test_duplicate_key_impl(archive).await;
316        });
317    }
318
319    #[test_traced]
320    fn test_duplicate_key_immutable_no_compression() {
321        let executor = deterministic::Runner::default();
322        executor.start(|context| async move {
323            let archive = create_immutable(context, None).await;
324            test_duplicate_key_impl(archive).await;
325        });
326    }
327
328    #[test_traced]
329    fn test_duplicate_key_immutable_compression() {
330        let executor = deterministic::Runner::default();
331        executor.start(|context| async move {
332            let archive = create_immutable(context, Some(3)).await;
333            test_duplicate_key_impl(archive).await;
334        });
335    }
336
337    async fn test_get_nonexistent_impl(archive: impl Archive<Key = FixedBytes<64>, Value = i32>) {
338        // Attempt to get an index that doesn't exist
339        let index = 1u64;
340        let retrieved: Option<i32> = archive
341            .get(Identifier::Index(index))
342            .await
343            .expect("Failed to get data");
344        assert!(retrieved.is_none());
345
346        // Attempt to get a key that doesn't exist
347        let key = test_key("nonexistent");
348        let retrieved = archive
349            .get(Identifier::Key(&key))
350            .await
351            .expect("Failed to get data");
352        assert!(retrieved.is_none());
353
354        archive.close().await.expect("Failed to close archive");
355    }
356
357    #[test_traced]
358    fn test_get_nonexistent_prunable_no_compression() {
359        let executor = deterministic::Runner::default();
360        executor.start(|context| async move {
361            let archive = create_prunable(context, None).await;
362            test_get_nonexistent_impl(archive).await;
363        });
364    }
365
366    #[test_traced]
367    fn test_get_nonexistent_prunable_compression() {
368        let executor = deterministic::Runner::default();
369        executor.start(|context| async move {
370            let archive = create_prunable(context, Some(3)).await;
371            test_get_nonexistent_impl(archive).await;
372        });
373    }
374
375    #[test_traced]
376    fn test_get_nonexistent_immutable_no_compression() {
377        let executor = deterministic::Runner::default();
378        executor.start(|context| async move {
379            let archive = create_immutable(context, None).await;
380            test_get_nonexistent_impl(archive).await;
381        });
382    }
383
384    #[test_traced]
385    fn test_get_nonexistent_immutable_compression() {
386        let executor = deterministic::Runner::default();
387        executor.start(|context| async move {
388            let archive = create_immutable(context, Some(3)).await;
389            test_get_nonexistent_impl(archive).await;
390        });
391    }
392
393    async fn test_persistence_impl<A, F, Fut>(context: Context, creator: F, compression: Option<u8>)
394    where
395        A: Archive<Key = FixedBytes<64>, Value = i32>,
396        F: Fn(Context, Option<u8>) -> Fut,
397        Fut: Future<Output = A>,
398    {
399        // Create and populate archive
400        {
401            let mut archive = creator(context.clone(), compression).await;
402
403            // Insert multiple keys
404            let keys = vec![
405                (1u64, test_key("key1"), 1),
406                (2u64, test_key("key2"), 2),
407                (3u64, test_key("key3"), 3),
408            ];
409
410            for (index, key, data) in &keys {
411                archive
412                    .put(*index, key.clone(), *data)
413                    .await
414                    .expect("Failed to put data");
415            }
416
417            // Close the archive
418            archive.close().await.expect("Failed to close archive");
419        }
420
421        // Reopen and verify data
422        {
423            let archive = creator(context, compression).await;
424
425            // Verify all keys are still present
426            let keys = vec![
427                (1u64, test_key("key1"), 1),
428                (2u64, test_key("key2"), 2),
429                (3u64, test_key("key3"), 3),
430            ];
431
432            for (index, key, expected_data) in &keys {
433                let retrieved = archive
434                    .get(Identifier::Index(*index))
435                    .await
436                    .expect("Failed to get data")
437                    .expect("Data not found");
438                assert_eq!(retrieved, *expected_data);
439
440                let retrieved = archive
441                    .get(Identifier::Key(key))
442                    .await
443                    .expect("Failed to get data")
444                    .expect("Data not found");
445                assert_eq!(retrieved, *expected_data);
446            }
447
448            archive.close().await.expect("Failed to close archive");
449        }
450    }
451
452    #[test_traced]
453    fn test_persistence_prunable_no_compression() {
454        let executor = deterministic::Runner::default();
455        executor.start(|context| async move {
456            test_persistence_impl(context, create_prunable, None).await;
457        });
458    }
459
460    #[test_traced]
461    fn test_persistence_prunable_compression() {
462        let executor = deterministic::Runner::default();
463        executor.start(|context| async move {
464            test_persistence_impl(context, create_prunable, Some(3)).await;
465        });
466    }
467
468    #[test_traced]
469    fn test_persistence_immutable_no_compression() {
470        let executor = deterministic::Runner::default();
471        executor.start(|context| async move {
472            test_persistence_impl(context, create_immutable, None).await;
473        });
474    }
475
476    #[test_traced]
477    fn test_persistence_immutable_compression() {
478        let executor = deterministic::Runner::default();
479        executor.start(|context| async move {
480            test_persistence_impl(context, create_immutable, Some(3)).await;
481        });
482    }
483
484    async fn test_ranges_impl<A, F, Fut>(mut context: Context, creator: F, compression: Option<u8>)
485    where
486        A: Archive<Key = FixedBytes<64>, Value = i32>,
487        F: Fn(Context, Option<u8>) -> Fut,
488        Fut: Future<Output = A>,
489    {
490        let mut keys = BTreeMap::new();
491        {
492            let mut archive = creator(context.clone(), compression).await;
493
494            // Insert 100 keys with gaps
495            let mut last_index = 0u64;
496            while keys.len() < 100 {
497                let gap: u64 = context.gen_range(1..=10);
498                let index = last_index + gap;
499                last_index = index;
500
501                let mut key_bytes = [0u8; 64];
502                context.fill(&mut key_bytes);
503                let key = FixedBytes::<64>::decode(key_bytes.as_ref()).unwrap();
504                let data: i32 = context.gen();
505
506                if keys.contains_key(&index) {
507                    continue;
508                }
509                keys.insert(index, (key.clone(), data));
510
511                archive
512                    .put(index, key, data)
513                    .await
514                    .expect("Failed to put data");
515            }
516
517            archive.close().await.expect("Failed to close archive");
518        }
519
520        {
521            let archive = creator(context, compression).await;
522            let sorted_indices: Vec<u64> = keys.keys().cloned().collect();
523
524            // Check gap before the first element
525            let (current_end, start_next) = archive.next_gap(0);
526            assert!(current_end.is_none());
527            assert_eq!(start_next, Some(sorted_indices[0]));
528
529            // Check gaps between elements
530            let mut i = 0;
531            while i < sorted_indices.len() {
532                let current_index = sorted_indices[i];
533
534                // Find the end of the current contiguous block
535                let mut j = i;
536                while j + 1 < sorted_indices.len() && sorted_indices[j + 1] == sorted_indices[j] + 1
537                {
538                    j += 1;
539                }
540                let block_end_index = sorted_indices[j];
541                let next_actual_index = if j + 1 < sorted_indices.len() {
542                    Some(sorted_indices[j + 1])
543                } else {
544                    None
545                };
546
547                let (current_end, start_next) = archive.next_gap(current_index);
548                assert_eq!(current_end, Some(block_end_index));
549                assert_eq!(start_next, next_actual_index);
550
551                // If there's a gap, check an index within the gap
552                if let Some(next_index) = next_actual_index {
553                    if next_index > block_end_index + 1 {
554                        let in_gap_index = block_end_index + 1;
555                        let (current_end, start_next) = archive.next_gap(in_gap_index);
556                        assert!(current_end.is_none());
557                        assert_eq!(start_next, Some(next_index));
558                    }
559                }
560                i = j + 1;
561            }
562
563            // Check the last element
564            let last_index = *sorted_indices.last().unwrap();
565            let (current_end, start_next) = archive.next_gap(last_index);
566            assert!(current_end.is_some());
567            assert!(start_next.is_none());
568
569            archive.close().await.expect("Failed to close archive");
570        }
571    }
572
573    #[test_traced]
574    fn test_ranges_prunable_no_compression() {
575        let executor = deterministic::Runner::default();
576        executor.start(|context| async move {
577            test_ranges_impl(context, create_prunable, None).await;
578        });
579    }
580
581    #[test_traced]
582    fn test_ranges_prunable_compression() {
583        let executor = deterministic::Runner::default();
584        executor.start(|context| async move {
585            test_ranges_impl(context, create_prunable, Some(3)).await;
586        });
587    }
588
589    #[test_traced]
590    fn test_ranges_immutable_no_compression() {
591        let executor = deterministic::Runner::default();
592        executor.start(|context| async move {
593            test_ranges_impl(context, create_immutable, None).await;
594        });
595    }
596
597    #[test_traced]
598    fn test_ranges_immutable_compression() {
599        let executor = deterministic::Runner::default();
600        executor.start(|context| async move {
601            test_ranges_impl(context, create_immutable, Some(3)).await;
602        });
603    }
604
605    async fn test_many_keys_impl<A, F, Fut>(
606        mut context: Context,
607        creator: F,
608        compression: Option<u8>,
609        num: usize,
610    ) where
611        A: Archive<Key = FixedBytes<64>, Value = i32>,
612        F: Fn(Context, Option<u8>) -> Fut,
613        Fut: Future<Output = A>,
614    {
615        // Insert many keys
616        let mut keys = BTreeMap::new();
617        {
618            let mut archive = creator(context.clone(), compression).await;
619            while keys.len() < num {
620                let index = keys.len() as u64;
621                let mut key = [0u8; 64];
622                context.fill(&mut key);
623                let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
624                let data: i32 = context.gen();
625
626                archive
627                    .put(index, key.clone(), data)
628                    .await
629                    .expect("Failed to put data");
630                keys.insert(key, (index, data));
631
632                // Randomly sync the archive
633                if context.gen_bool(0.1) {
634                    archive.sync().await.expect("Failed to sync archive");
635                }
636            }
637            archive.sync().await.expect("Failed to sync archive");
638
639            // Ensure all keys can be retrieved
640            for (key, (index, data)) in &keys {
641                let retrieved = archive
642                    .get(Identifier::Index(*index))
643                    .await
644                    .expect("Failed to get data")
645                    .expect("Data not found");
646                assert_eq!(&retrieved, data);
647                let retrieved = archive
648                    .get(Identifier::Key(key))
649                    .await
650                    .expect("Failed to get data")
651                    .expect("Data not found");
652                assert_eq!(&retrieved, data);
653            }
654
655            archive.close().await.expect("Failed to close archive");
656        }
657
658        // Reinitialize and verify
659        {
660            let archive = creator(context.clone(), compression).await;
661
662            // Ensure all keys can be retrieved
663            for (key, (index, data)) in &keys {
664                let retrieved = archive
665                    .get(Identifier::Index(*index))
666                    .await
667                    .expect("Failed to get data")
668                    .expect("Data not found");
669                assert_eq!(&retrieved, data);
670                let retrieved = archive
671                    .get(Identifier::Key(key))
672                    .await
673                    .expect("Failed to get data")
674                    .expect("Data not found");
675                assert_eq!(&retrieved, data);
676            }
677
678            archive.close().await.expect("Failed to close archive");
679        }
680    }
681
682    fn test_many_keys_determinism<F, Fut, A>(creator: F, compression: Option<u8>, num: usize)
683    where
684        A: Archive<Key = FixedBytes<64>, Value = i32>,
685        F: Fn(Context, Option<u8>) -> Fut + Copy + Send + 'static,
686        Fut: Future<Output = A> + Send,
687    {
688        let executor = deterministic::Runner::default();
689        let state1 = executor.start(|context| async move {
690            test_many_keys_impl(context.clone(), creator, compression, num).await;
691            context.auditor().state()
692        });
693        let executor = deterministic::Runner::default();
694        let state2 = executor.start(|context| async move {
695            test_many_keys_impl(context.clone(), creator, compression, num).await;
696            context.auditor().state()
697        });
698        assert_eq!(state1, state2);
699    }
700
701    #[test_traced]
702    fn test_many_keys_prunable_no_compression() {
703        test_many_keys_determinism(create_prunable, None, 1_000);
704    }
705
706    #[test_traced]
707    fn test_many_keys_prunable_compression() {
708        test_many_keys_determinism(create_prunable, Some(3), 1_000);
709    }
710
711    #[test_traced]
712    fn test_many_keys_immutable_no_compression() {
713        test_many_keys_determinism(create_immutable, None, 1_000);
714    }
715
716    #[test_traced]
717    fn test_many_keys_immutable_compression() {
718        test_many_keys_determinism(create_immutable, Some(3), 1_000);
719    }
720
721    #[test_traced]
722    #[ignore]
723    fn test_many_keys_prunable_large() {
724        test_many_keys_determinism(create_prunable, None, 50_000);
725    }
726
727    #[test_traced]
728    #[ignore]
729    fn test_many_keys_immutable_large() {
730        test_many_keys_determinism(create_immutable, None, 50_000);
731    }
732}