commonware_storage/archive/
mod.rs

1//! A write-once key-value store optimized for low-latency reads.
2//!
3//! `Archive` is a key-value store designed for workloads where all data is written only once and is
4//! uniquely associated with both an `index` and a `key`.
5//!
6//! Data is stored in `Journal` (an append-only log) and the location of written data is stored
7//! in-memory by both index and key (truncated representation using a caller-provided `Translator`)
8//! to enable **single-read lookups** for both query patterns over all archived data.
9//!
10//! _Notably, `Archive` does not make use of compaction nor on-disk indexes (and thus has no read
11//! nor write amplification during normal operation)._
12//!
13//! # Format
14//!
15//! `Archive` stores data in the following format:
16//!
17//! ```text
18//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
19//! | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |13 |14 |15 |16 |      ...      |
20//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
21//! |          Index(u64)           |  Key(Fixed Size)  |    C(u32)     |     Data      |
22//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
23//!
24//! C = CRC32(Key)
25//! ```
26//!
27//! _To ensure keys fetched using `Journal::get_prefix` are correctly read, the `Archive` includes a
28//! checksum of each index + key. (The `Journal` additionally checksums each entry, but this
29//! checksum is only verified if the entire entry is read via `Journal::get`.)_
30//!
31//! # Uniqueness
32//!
33//! `Archive` assumes all stored indexes and keys are unique. If the same key is associated with
34//! multiple `indices`, there is no guarantee which value will be returned. If the key is written to
35//! an existing `index`, `Archive` will return an error.
36//!
37//! ## Conflicts
38//!
39//! Because a truncated representation of a key is only ever stored in memory, it is possible (and
40//! expected) that two keys will eventually be represented by the same truncated key. To handle this
41//! case, `Archive` must check the persisted form of all conflicting keys to ensure data from the
42//! correct key is returned. To support efficient checks, `Archive` keeps a linked list of all keys
43//! with the same truncated prefix:
44//!
45//! ```rust
46//! struct Record {
47//!     index: u64,
48//!
49//!     next: Option<Box<Record>>,
50//! }
51//! ```
52//!
53//! _To avoid random memory reads in the common case, the in-memory index directly stores the first
54//! item in the linked list instead of a pointer to the first item._
55//!
56//! `index` is the key to the map used to serve lookups by `index` that stores the location of data
57//! in a given `Blob` (selected by `section = index & section_mask` to minimize the number of open
58//! `Journals`):
59//!
60//! ```rust
61//! struct Location {
62//!     offset: u32,
63//!     len: u32,
64//! }
65//! ```
66//!
67//! _If the `Translator` provided by the caller does not uniformly distribute keys across the key
68//! space or uses a truncated representation that means keys on average have many conflicts,
69//! performance will degrade._
70//!
71//! ## Memory Overhead
72//!
73//! `Archive` uses two maps to enable lookups by both index and key. The memory used to track each
74//! index item is `8 + 4 + 4` (where `8` is the index, `4` is the offset, and `4` is the length).
75//! The memory used to track each key item is `~truncated(key).len() + 16` bytes (where `16` is the
76//! size of the `Record` struct). This means that an `Archive` employing a `Translator` that uses
77//! the first `8` bytes of a key will use `~40` bytes to index each key.
78//!
79//! # Sync
80//!
81//! `Archive` flushes writes in a given `section` (computed by `index & section_mask`) to `Storage`
82//! after `pending_writes`. If the caller requires durability on a particular write, they can call
83//! `sync`.
84//!
85//! # Pruning
86//!
87//! `Archive` supports pruning up to a minimum `index` using the `prune` method. After `prune` is
88//! called on a `section`, all interaction with a `section` less than the pruned `section` will
89//! return an error.
90//!
91//! ## Lazy Index Cleanup
92//!
93//! Instead of performing a full iteration of the in-memory index, storing an additional in-memory
94//! index per `section`, or replaying a `section` of `Journal`, `Archive` lazily cleans up the
95//! in-memory index after pruning. When a new key is stored that overlaps (same truncated value)
96//! with a pruned key, the pruned key is removed from the in-memory index.
97//!
98//! # Single Operation Reads
99//!
100//! To enable single operation reads (i.e. reading all of an item in a single call to `Blob`),
101//! `Archive` caches the length of each item in its in-memory index. While it increases the
102//! footprint per key stored, the benefit of only ever performing a single operation to read a key
103//! (when there are no conflicts) is worth the tradeoff.
104//!
105//! # Compression
106//!
107//! `Archive` supports compressing data before storing it on disk. This can be enabled by setting
108//! the `compression` field in the `Config` struct to a valid `zstd` compression level. This setting
109//! can be changed between initializations of `Archive`, however, it must remain populated if any
110//! data was written with compression enabled.
111//!
112//! # Querying for Gaps
113//!
114//! `Archive` tracks gaps in the index space to enable the caller to efficiently fetch unknown keys
115//! using `next_gap`. This is a very common pattern when syncing blocks in a blockchain.
116//!
117//! # Example
118//!
119//! ```rust
120//! use commonware_runtime::{Spawner, Runner, deterministic::Executor};
121//! use commonware_cryptography::hash;
122//! use commonware_storage::archive::{Archive, Config, translator::FourCap};
123//! use commonware_storage::journal::{Error, variable::{Config as JConfig, Journal}};
124//!
125//! let (executor, context, _) = Executor::default();
126//! executor.start(async move {
127//!     // Create a journal
128//!     let cfg = JConfig {
129//!         partition: "partition".to_string()
130//!     };
131//!     let journal = Journal::init(context.clone(), cfg).await.unwrap();
132//!
133//!     // Create an archive
134//!     let cfg = Config {
135//!         translator: FourCap,
136//!         section_mask: 0xffff_ffff_ffff_0000u64,
137//!         pending_writes: 10,
138//!         replay_concurrency: 4,
139//!         compression: Some(3),
140//!     };
141//!     let mut archive = Archive::init(context, journal, cfg).await.unwrap();
142//!
143//!     // Put a key
144//!     archive.put(1, hash(b"data"), "data".into()).await.unwrap();
145//!
146//!     // Close the archive (also closes the journal)
147//!     archive.close().await.unwrap();
148//! });
149//! ```
150
151mod storage;
152pub use storage::{Archive, Identifier};
153pub mod translator;
154
155use std::hash::Hash;
156use thiserror::Error;
157
158/// Errors that can occur when interacting with the archive.
159#[derive(Debug, Error)]
160pub enum Error {
161    #[error("journal error: {0}")]
162    Journal(#[from] crate::journal::Error),
163    #[error("record corrupted")]
164    RecordCorrupted,
165    #[error("already pruned to: {0}")]
166    AlreadyPrunedTo(u64),
167    #[error("record too large")]
168    RecordTooLarge,
169    #[error("compression failed")]
170    CompressionFailed,
171    #[error("decompression failed")]
172    DecompressionFailed,
173}
174
175/// Translate keys into an internal representation used in `Archive`'s
176/// in-memory index.
177///
178/// If invoking `transform` on keys results in many conflicts, the performance
179/// of `Archive` will degrade substantially.
180pub trait Translator: Clone {
181    type Key: Eq + Hash + Send + Sync + Clone;
182
183    /// Transform a key into its internal representation.
184    fn transform(&self, key: &[u8]) -> Self::Key;
185}
186
187/// Configuration for `Archive` storage.
188#[derive(Clone)]
189pub struct Config<T: Translator> {
190    /// Logic to transform keys into their index representation.
191    ///
192    /// `Archive` assumes that all internal keys are spread uniformly across the key space.
193    /// If that is not the case, lookups may be O(n) instead of O(1).
194    pub translator: T,
195
196    /// Mask to apply to indices to determine section.
197    ///
198    /// This value is `index & section_mask`.
199    pub section_mask: u64,
200
201    /// The number of writes to buffer in a section before forcing a sync in the journal.
202    ///
203    /// If set to 0, the journal will be synced each time a new item is stored.
204    pub pending_writes: usize,
205
206    /// The number of blobs to replay concurrently on initialization.
207    pub replay_concurrency: usize,
208
209    /// Optional compression level (using `zstd`) to apply to data before storing.
210    pub compression: Option<u8>,
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216    use crate::journal::variable::{Config as JConfig, Journal};
217    use crate::journal::Error as JournalError;
218    use bytes::Bytes;
219    use commonware_macros::test_traced;
220    use commonware_runtime::Metrics;
221    use commonware_runtime::{deterministic::Executor, Blob, Runner, Storage};
222    use commonware_utils::array::FixedBytes;
223    use rand::Rng;
224    use std::collections::BTreeMap;
225    use translator::{FourCap, TwoCap};
226
227    const DEFAULT_SECTION_MASK: u64 = 0xffff_ffff_ffff_0000u64;
228
229    fn test_key(key: &str) -> FixedBytes<64> {
230        let mut buf = [0u8; 64];
231        let key = key.as_bytes();
232        assert!(key.len() <= buf.len());
233        buf[..key.len()].copy_from_slice(key);
234        FixedBytes::try_from(&buf[..]).unwrap()
235    }
236
237    fn test_archive_put_get(compression: Option<u8>) {
238        // Initialize the deterministic context
239        let (executor, context, _) = Executor::default();
240        executor.start(async move {
241            // Initialize an empty journal
242            let journal = Journal::init(
243                context.clone(),
244                JConfig {
245                    partition: "test_partition".into(),
246                },
247            )
248            .await
249            .expect("Failed to initialize journal");
250
251            // Initialize the archive
252            let cfg = Config {
253                translator: FourCap,
254                pending_writes: 10,
255                replay_concurrency: 4,
256                compression,
257                section_mask: DEFAULT_SECTION_MASK,
258            };
259            let mut archive = Archive::init(context.clone(), journal, cfg.clone())
260                .await
261                .expect("Failed to initialize archive");
262
263            let index = 1u64;
264            let key = test_key("testkey");
265            let data = Bytes::from("testdata");
266
267            // Has the key
268            let has = archive
269                .has(Identifier::Index(index))
270                .await
271                .expect("Failed to check key");
272            assert!(!has);
273            let has = archive
274                .has(Identifier::Key(&key))
275                .await
276                .expect("Failed to check key");
277            assert!(!has);
278
279            // Put the key-data pair
280            archive
281                .put(index, key.clone(), data.clone())
282                .await
283                .expect("Failed to put data");
284
285            // Has the key
286            let has = archive
287                .has(Identifier::Index(index))
288                .await
289                .expect("Failed to check key");
290            assert!(has);
291            let has = archive
292                .has(Identifier::Key(&key))
293                .await
294                .expect("Failed to check key");
295            assert!(has);
296
297            // Get the data back
298            let retrieved = archive
299                .get(Identifier::Index(index))
300                .await
301                .expect("Failed to get data")
302                .expect("Data not found");
303            assert_eq!(retrieved, data);
304            let retrieved = archive
305                .get(Identifier::Key(&key))
306                .await
307                .expect("Failed to get data")
308                .expect("Data not found");
309            assert_eq!(retrieved, data);
310
311            // Check metrics
312            let buffer = context.encode();
313            assert!(buffer.contains("items_tracked 1"));
314            assert!(buffer.contains("unnecessary_reads_total 0"));
315            assert!(buffer.contains("gets_total 2"));
316            assert!(buffer.contains("has_total 4"));
317            assert!(buffer.contains("syncs_total 0"));
318
319            // Force a sync
320            archive.sync().await.expect("Failed to sync data");
321
322            // Check metrics
323            let buffer = context.encode();
324            assert!(buffer.contains("items_tracked 1"));
325            assert!(buffer.contains("unnecessary_reads_total 0"));
326            assert!(buffer.contains("gets_total 2"));
327            assert!(buffer.contains("has_total 4"));
328            assert!(buffer.contains("syncs_total 1"));
329        });
330    }
331
332    #[test_traced]
333    fn test_archive_put_get_no_compression() {
334        test_archive_put_get(None);
335    }
336
337    #[test_traced]
338    fn test_archive_put_get_compression() {
339        test_archive_put_get(Some(3));
340    }
341
342    #[test_traced]
343    fn test_archive_compression_then_none() {
344        // Initialize the deterministic context
345        let (executor, context, _) = Executor::default();
346        executor.start(async move {
347            // Initialize an empty journal
348            let journal = Journal::init(
349                context.clone(),
350                JConfig {
351                    partition: "test_partition".into(),
352                },
353            )
354            .await
355            .expect("Failed to initialize journal");
356
357            // Initialize the archive
358            let cfg = Config {
359                translator: FourCap,
360                pending_writes: 10,
361                replay_concurrency: 4,
362                compression: Some(3),
363                section_mask: DEFAULT_SECTION_MASK,
364            };
365            let mut archive = Archive::init(context.clone(), journal, cfg.clone())
366                .await
367                .expect("Failed to initialize archive");
368
369            // Put the key-data pair
370            let index = 1u64;
371            let key = test_key("testkey");
372            let data = Bytes::from("testdata");
373            archive
374                .put(index, key.clone(), data.clone())
375                .await
376                .expect("Failed to put data");
377
378            // Close the archive
379            archive.close().await.expect("Failed to close archive");
380
381            // Initialize the archive again without compression
382            let journal = Journal::init(
383                context.clone(),
384                JConfig {
385                    partition: "test_partition".into(),
386                },
387            )
388            .await
389            .expect("Failed to initialize journal");
390            let cfg = Config {
391                translator: FourCap,
392                pending_writes: 10,
393                replay_concurrency: 4,
394                compression: None,
395                section_mask: DEFAULT_SECTION_MASK,
396            };
397            let archive = Archive::init(context, journal, cfg.clone())
398                .await
399                .expect("Failed to initialize archive");
400
401            // Get the data back
402            let retrieved = archive
403                .get(Identifier::Index(index))
404                .await
405                .expect("Failed to get data")
406                .expect("Data not found");
407            assert_ne!(retrieved, data);
408            let retrieved = archive
409                .get(Identifier::Key(&key))
410                .await
411                .expect("Failed to get data")
412                .expect("Data not found");
413            assert_ne!(retrieved, data);
414        });
415    }
416
417    #[test_traced]
418    fn test_archive_record_corruption() {
419        // Initialize the deterministic context
420        let (executor, context, _) = Executor::default();
421        executor.start(async move {
422            // Initialize an empty journal
423            let journal = Journal::init(
424                context.clone(),
425                JConfig {
426                    partition: "test_partition".into(),
427                },
428            )
429            .await
430            .expect("Failed to initialize journal");
431
432            // Initialize the archive
433            let cfg = Config {
434                translator: FourCap,
435                pending_writes: 10,
436                replay_concurrency: 4,
437                compression: None,
438                section_mask: DEFAULT_SECTION_MASK,
439            };
440            let mut archive = Archive::init(context.clone(), journal, cfg.clone())
441                .await
442                .expect("Failed to initialize archive");
443
444            let index = 1u64;
445            let key = test_key("testkey");
446            let data = Bytes::from("testdata");
447
448            // Put the key-data pair
449            archive
450                .put(index, key.clone(), data.clone())
451                .await
452                .expect("Failed to put data");
453
454            // Close the archive
455            archive.close().await.expect("Failed to close archive");
456
457            // Corrupt the value
458            let section = index & DEFAULT_SECTION_MASK;
459            let blob = context
460                .open("test_partition", &section.to_be_bytes())
461                .await
462                .unwrap();
463            let value_location = 4 + 8 + 64 + 4;
464            blob.write_at(b"testdaty", value_location).await.unwrap();
465            blob.close().await.unwrap();
466
467            // Initialize the archive again
468            let journal = Journal::init(
469                context.clone(),
470                JConfig {
471                    partition: "test_partition".into(),
472                },
473            )
474            .await
475            .expect("Failed to initialize journal");
476            let archive = Archive::init(
477                context,
478                journal,
479                Config {
480                    translator: FourCap,
481                    pending_writes: 10,
482                    replay_concurrency: 4,
483                    compression: None,
484                    section_mask: DEFAULT_SECTION_MASK,
485                },
486            )
487            .await
488            .expect("Failed to initialize archive");
489
490            // Attempt to get the key
491            let result = archive.get(Identifier::Key(&key)).await;
492            assert!(matches!(
493                result,
494                Err(Error::Journal(JournalError::ChecksumMismatch(_, _)))
495            ));
496        });
497    }
498
499    #[test_traced]
500    fn test_archive_duplicate_key() {
501        // Initialize the deterministic context
502        let (executor, context, _) = Executor::default();
503        executor.start(async move {
504            // Initialize an empty journal
505            let journal = Journal::init(
506                context.clone(),
507                JConfig {
508                    partition: "test_partition".into(),
509                },
510            )
511            .await
512            .expect("Failed to initialize journal");
513
514            // Initialize the archive
515            let cfg = Config {
516                translator: FourCap,
517                pending_writes: 10,
518                replay_concurrency: 4,
519                compression: None,
520                section_mask: DEFAULT_SECTION_MASK,
521            };
522            let mut archive = Archive::init(context.clone(), journal, cfg.clone())
523                .await
524                .expect("Failed to initialize archive");
525
526            let index = 1u64;
527            let key = test_key("duplicate");
528            let data1 = Bytes::from("data1");
529            let data2 = Bytes::from("data2");
530
531            // Put the key-data pair
532            archive
533                .put(index, key.clone(), data1.clone())
534                .await
535                .expect("Failed to put data");
536
537            // Put the key-data pair again
538            archive
539                .put(index, key.clone(), data2.clone())
540                .await
541                .expect("Duplicate put should not fail");
542
543            // Get the data back
544            let retrieved = archive
545                .get(Identifier::Index(index))
546                .await
547                .expect("Failed to get data")
548                .expect("Data not found");
549            assert_eq!(retrieved, data1);
550            let retrieved = archive
551                .get(Identifier::Key(&key))
552                .await
553                .expect("Failed to get data")
554                .expect("Data not found");
555            assert_eq!(retrieved, data1);
556
557            // Check metrics
558            let buffer = context.encode();
559            assert!(buffer.contains("items_tracked 1"));
560            assert!(buffer.contains("unnecessary_reads_total 0"));
561            assert!(buffer.contains("gets_total 2"));
562        });
563    }
564
565    #[test_traced]
566    fn test_archive_get_nonexistent() {
567        // Initialize the deterministic context
568        let (executor, context, _) = Executor::default();
569        executor.start(async move {
570            // Initialize an empty journal
571            let journal = Journal::init(
572                context.clone(),
573                JConfig {
574                    partition: "test_partition".into(),
575                },
576            )
577            .await
578            .expect("Failed to initialize journal");
579
580            // Initialize the archive
581            let cfg = Config {
582                translator: FourCap,
583                pending_writes: 10,
584                replay_concurrency: 4,
585                compression: None,
586                section_mask: DEFAULT_SECTION_MASK,
587            };
588            let archive = Archive::init(context.clone(), journal, cfg.clone())
589                .await
590                .expect("Failed to initialize archive");
591
592            // Attempt to get an index that doesn't exist
593            let index = 1u64;
594            let retrieved = archive
595                .get(Identifier::Index(index))
596                .await
597                .expect("Failed to get data");
598            assert!(retrieved.is_none());
599
600            // Attempt to get a key that doesn't exist
601            let key = test_key("nonexistent");
602            let retrieved = archive
603                .get(Identifier::Key(&key))
604                .await
605                .expect("Failed to get data");
606            assert!(retrieved.is_none());
607
608            // Check metrics
609            let buffer = context.encode();
610            assert!(buffer.contains("items_tracked 0"));
611            assert!(buffer.contains("unnecessary_reads_total 0"));
612            assert!(buffer.contains("gets_total 2"));
613        });
614    }
615
616    #[test_traced]
617    fn test_archive_overlapping_key() {
618        // Initialize the deterministic context
619        let (executor, context, _) = Executor::default();
620        executor.start(async move {
621            // Initialize an empty journal
622            let journal = Journal::init(
623                context.clone(),
624                JConfig {
625                    partition: "test_partition".into(),
626                },
627            )
628            .await
629            .expect("Failed to initialize journal");
630
631            // Initialize the archive
632            let cfg = Config {
633                translator: FourCap,
634                pending_writes: 10,
635                replay_concurrency: 4,
636                compression: None,
637                section_mask: DEFAULT_SECTION_MASK,
638            };
639            let mut archive = Archive::init(context.clone(), journal, cfg.clone())
640                .await
641                .expect("Failed to initialize archive");
642
643            let index1 = 1u64;
644            let key1 = test_key("keys1");
645            let data1 = Bytes::from("data1");
646            let index2 = 2u64;
647            let key2 = test_key("keys2");
648            let data2 = Bytes::from("data2");
649
650            // Put the key-data pair
651            archive
652                .put(index1, key1.clone(), data1.clone())
653                .await
654                .expect("Failed to put data");
655
656            // Put the key-data pair
657            archive
658                .put(index2, key2.clone(), data2.clone())
659                .await
660                .expect("Failed to put data");
661
662            // Get the data back
663            let retrieved = archive
664                .get(Identifier::Key(&key1))
665                .await
666                .expect("Failed to get data")
667                .expect("Data not found");
668            assert_eq!(retrieved, data1);
669
670            // Get the data back
671            let retrieved = archive
672                .get(Identifier::Key(&key2))
673                .await
674                .expect("Failed to get data")
675                .expect("Data not found");
676            assert_eq!(retrieved, data2);
677
678            // Check metrics
679            let buffer = context.encode();
680            assert!(buffer.contains("items_tracked 2"));
681            assert!(buffer.contains("unnecessary_reads_total 1"));
682            assert!(buffer.contains("gets_total 2"));
683        });
684    }
685
686    #[test_traced]
687    fn test_archive_overlapping_key_multiple_sections() {
688        // Initialize the deterministic context
689        let (executor, context, _) = Executor::default();
690        executor.start(async move {
691            // Initialize an empty journal
692            let journal = Journal::init(
693                context.clone(),
694                JConfig {
695                    partition: "test_partition".into(),
696                },
697            )
698            .await
699            .expect("Failed to initialize journal");
700
701            // Initialize the archive
702            let cfg = Config {
703                translator: FourCap,
704                pending_writes: 10,
705                replay_concurrency: 4,
706                compression: None,
707                section_mask: DEFAULT_SECTION_MASK,
708            };
709            let mut archive = Archive::init(context.clone(), journal, cfg.clone())
710                .await
711                .expect("Failed to initialize archive");
712
713            let index1 = 1u64;
714            let key1 = test_key("keys1");
715            let data1 = Bytes::from("data1");
716            let index2 = 2_000_000u64;
717            let key2 = test_key("keys2");
718            let data2 = Bytes::from("data2");
719
720            // Put the key-data pair
721            archive
722                .put(index1, key1.clone(), data1.clone())
723                .await
724                .expect("Failed to put data");
725
726            // Put the key-data pair
727            archive
728                .put(index2, key2.clone(), data2.clone())
729                .await
730                .expect("Failed to put data");
731
732            // Get the data back
733            let retrieved = archive
734                .get(Identifier::Key(&key1))
735                .await
736                .expect("Failed to get data")
737                .expect("Data not found");
738            assert_eq!(retrieved, data1);
739
740            // Get the data back
741            let retrieved = archive
742                .get(Identifier::Key(&key2))
743                .await
744                .expect("Failed to get data")
745                .expect("Data not found");
746            assert_eq!(retrieved, data2);
747        });
748    }
749
750    #[test_traced]
751    fn test_archive_prune_keys() {
752        // Initialize the deterministic context
753        let (executor, context, _) = Executor::default();
754        executor.start(async move {
755            // Initialize an empty journal
756            let journal = Journal::init(
757                context.clone(),
758                JConfig {
759                    partition: "test_partition".into(),
760                },
761            )
762            .await
763            .expect("Failed to initialize journal");
764
765            // Initialize the archive
766            let cfg = Config {
767                translator: FourCap,
768                pending_writes: 10,
769                replay_concurrency: 4,
770                compression: None,
771                section_mask: 0xffff_ffff_ffff_ffffu64, // no mask
772            };
773            let mut archive = Archive::init(context.clone(), journal, cfg.clone())
774                .await
775                .expect("Failed to initialize archive");
776
777            // Insert multiple keys across different sections
778            let keys = vec![
779                (1u64, test_key("key1-blah"), Bytes::from("data1")),
780                (2u64, test_key("key2-blah"), Bytes::from("data2")),
781                (3u64, test_key("key3-blah"), Bytes::from("data3")),
782                (4u64, test_key("key3-bleh"), Bytes::from("data3-again")),
783                (5u64, test_key("key4-blah"), Bytes::from("data4")),
784            ];
785
786            for (index, key, data) in &keys {
787                archive
788                    .put(*index, key.clone(), data.clone())
789                    .await
790                    .expect("Failed to put data");
791            }
792
793            // Check metrics
794            let buffer = context.encode();
795            assert!(buffer.contains("items_tracked 5"));
796
797            // Prune sections less than 3
798            archive.prune(3).await.expect("Failed to prune");
799
800            // Ensure keys 1 and 2 are no longer present
801            for (index, key, data) in keys {
802                let retrieved = archive
803                    .get(Identifier::Key(&key))
804                    .await
805                    .expect("Failed to get data");
806                if index < 3 {
807                    assert!(retrieved.is_none());
808                } else {
809                    assert_eq!(retrieved.expect("Data not found"), data);
810                }
811            }
812
813            // Check metrics
814            let buffer = context.encode();
815            assert!(buffer.contains("items_tracked 3"));
816            assert!(buffer.contains("indices_pruned_total 2"));
817            assert!(buffer.contains("keys_pruned_total 0")); // no lazy cleanup yet
818
819            // Try to prune older section
820            archive.prune(2).await.expect("Failed to prune");
821
822            // Try to prune current section again
823            archive.prune(3).await.expect("Failed to prune");
824
825            // Try to put older index
826            let result = archive
827                .put(1, test_key("key1-blah"), Bytes::from("data1"))
828                .await;
829            assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
830
831            // Trigger lazy removal of keys
832            archive
833                .put(6, test_key("key2-blfh"), Bytes::from("data2-2"))
834                .await
835                .expect("Failed to put data");
836
837            // Check metrics
838            let buffer = context.encode();
839            assert!(buffer.contains("items_tracked 4")); // lazily remove one, add one
840            assert!(buffer.contains("indices_pruned_total 2"));
841            assert!(buffer.contains("keys_pruned_total 1"));
842        });
843    }
844
845    fn test_archive_keys_and_restart(num_keys: usize) -> String {
846        // Initialize the deterministic context
847        let (executor, mut context, auditor) = Executor::default();
848        executor.start(async move {
849            // Initialize an empty journal
850            let journal = Journal::init(
851                context.clone(),
852                JConfig {
853                    partition: "test_partition".into(),
854                },
855            )
856            .await
857            .expect("Failed to initialize journal");
858
859            // Initialize the archive
860            let section_mask = 0xffff_ffff_ffff_ff00u64;
861            let cfg = Config {
862                translator: TwoCap,
863                pending_writes: 10,
864                replay_concurrency: 4,
865                compression: None,
866                section_mask,
867            };
868            let mut archive = Archive::init(context.clone(), journal, cfg.clone())
869                .await
870                .expect("Failed to initialize archive");
871
872            // Insert multiple keys across different sections
873            let mut keys = BTreeMap::new();
874            while keys.len() < num_keys {
875                let index = keys.len() as u64;
876                let mut key = [0u8; 64];
877                context.fill(&mut key);
878                let key = FixedBytes::<64>::try_from(&key[..]).unwrap();
879                let mut data = [0u8; 1024];
880                context.fill(&mut data);
881                let data = Bytes::from(data.to_vec());
882                archive
883                    .put(index, key.clone(), data.clone())
884                    .await
885                    .expect("Failed to put data");
886                keys.insert(key, (index, data));
887            }
888
889            // Ensure all keys can be retrieved
890            for (key, (index, data)) in &keys {
891                let retrieved = archive
892                    .get(Identifier::Index(*index))
893                    .await
894                    .expect("Failed to get data")
895                    .expect("Data not found");
896                assert_eq!(retrieved, data);
897                let retrieved = archive
898                    .get(Identifier::Key(key))
899                    .await
900                    .expect("Failed to get data")
901                    .expect("Data not found");
902                assert_eq!(retrieved, data);
903            }
904
905            // Check metrics
906            let buffer = context.encode();
907            let tracked = format!("items_tracked {:?}", num_keys);
908            assert!(buffer.contains(&tracked));
909            assert!(buffer.contains("keys_pruned_total 0"));
910
911            // Close the archive
912            archive.close().await.expect("Failed to close archive");
913
914            // Reinitialize the archive
915            let journal = Journal::init(
916                context.clone(),
917                JConfig {
918                    partition: "test_partition".into(),
919                },
920            )
921            .await
922            .expect("Failed to initialize journal");
923            let cfg = Config {
924                translator: TwoCap,
925                pending_writes: 10,
926                replay_concurrency: 4,
927                compression: None,
928                section_mask,
929            };
930            let mut archive = Archive::init(context.clone(), journal, cfg.clone())
931                .await
932                .expect("Failed to initialize archive");
933
934            // Ensure all keys can be retrieved
935            for (key, (index, data)) in &keys {
936                let retrieved = archive
937                    .get(Identifier::Index(*index))
938                    .await
939                    .expect("Failed to get data")
940                    .expect("Data not found");
941                assert_eq!(retrieved, data);
942                let retrieved = archive
943                    .get(Identifier::Key(key))
944                    .await
945                    .expect("Failed to get data")
946                    .expect("Data not found");
947                assert_eq!(retrieved, data);
948            }
949
950            // Prune first half
951            let min = (keys.len() / 2) as u64;
952            archive.prune(min).await.expect("Failed to prune");
953
954            // Ensure all keys can be retrieved that haven't been pruned
955            let min = min & section_mask;
956            let mut removed = 0;
957            for (key, (index, data)) in keys {
958                if index >= min {
959                    let retrieved = archive
960                        .get(Identifier::Key(&key))
961                        .await
962                        .expect("Failed to get data")
963                        .expect("Data not found");
964                    assert_eq!(retrieved, data);
965
966                    // Check range
967                    let (current_end, start_next) = archive.next_gap(index);
968                    assert_eq!(current_end.unwrap(), num_keys as u64 - 1);
969                    assert!(start_next.is_none());
970                } else {
971                    let retrieved = archive
972                        .get(Identifier::Key(&key))
973                        .await
974                        .expect("Failed to get data");
975                    assert!(retrieved.is_none());
976                    removed += 1;
977
978                    // Check range
979                    let (current_end, start_next) = archive.next_gap(index);
980                    assert!(current_end.is_none());
981                    assert_eq!(start_next.unwrap(), min);
982                }
983            }
984
985            // Check metrics
986            let buffer = context.encode();
987            let tracked = format!("items_tracked {:?}", num_keys - removed);
988            assert!(buffer.contains(&tracked));
989            let pruned = format!("indices_pruned_total {}", removed);
990            assert!(buffer.contains(&pruned));
991            assert!(buffer.contains("keys_pruned_total 0")); // have not lazily removed keys yet
992        });
993        auditor.state()
994    }
995
996    #[test_traced]
997    fn test_archive_many_keys_and_restart() {
998        test_archive_keys_and_restart(100_000); // 391 sections
999    }
1000
1001    #[test_traced]
1002    fn test_determinism() {
1003        let state1 = test_archive_keys_and_restart(5_000); // 20 sections
1004        let state2 = test_archive_keys_and_restart(5_000);
1005        assert_eq!(state1, state2);
1006    }
1007
1008    #[test_traced]
1009    fn test_ranges() {
1010        // Initialize the deterministic context
1011        let (executor, context, _) = Executor::default();
1012        executor.start(async move {
1013            // Initialize an empty journal
1014            let journal = Journal::init(
1015                context.clone(),
1016                JConfig {
1017                    partition: "test_partition".into(),
1018                },
1019            )
1020            .await
1021            .expect("Failed to initialize journal");
1022
1023            // Initialize the archive
1024            let cfg = Config {
1025                translator: FourCap,
1026                pending_writes: 10,
1027                replay_concurrency: 4,
1028                compression: None,
1029                section_mask: DEFAULT_SECTION_MASK,
1030            };
1031            let mut archive = Archive::init(context.clone(), journal, cfg.clone())
1032                .await
1033                .expect("Failed to initialize archive");
1034
1035            // Insert multiple keys across different indices
1036            let keys = vec![
1037                (1u64, test_key("key1-blah"), Bytes::from("data1")),
1038                (10u64, test_key("key2-blah"), Bytes::from("data2")),
1039                (11u64, test_key("key3-blah"), Bytes::from("data3")),
1040                (14u64, test_key("key3-bleh"), Bytes::from("data3-again")),
1041            ];
1042            for (index, key, data) in &keys {
1043                archive
1044                    .put(*index, key.clone(), data.clone())
1045                    .await
1046                    .expect("Failed to put data");
1047            }
1048
1049            // Check ranges
1050            let (current_end, start_next) = archive.next_gap(0);
1051            assert!(current_end.is_none());
1052            assert_eq!(start_next.unwrap(), 1);
1053
1054            let (current_end, start_next) = archive.next_gap(1);
1055            assert_eq!(current_end.unwrap(), 1);
1056            assert_eq!(start_next.unwrap(), 10);
1057
1058            let (current_end, start_next) = archive.next_gap(10);
1059            assert_eq!(current_end.unwrap(), 11);
1060            assert_eq!(start_next.unwrap(), 14);
1061
1062            let (current_end, start_next) = archive.next_gap(11);
1063            assert_eq!(current_end.unwrap(), 11);
1064            assert_eq!(start_next.unwrap(), 14);
1065
1066            let (current_end, start_next) = archive.next_gap(12);
1067            assert!(current_end.is_none());
1068            assert_eq!(start_next.unwrap(), 14);
1069
1070            let (current_end, start_next) = archive.next_gap(14);
1071            assert_eq!(current_end.unwrap(), 14);
1072            assert!(start_next.is_none());
1073
1074            // Close and check again
1075            archive.close().await.expect("Failed to close archive");
1076
1077            let journal = Journal::init(
1078                context.clone(),
1079                JConfig {
1080                    partition: "test_partition".into(),
1081                },
1082            )
1083            .await
1084            .expect("Failed to initialize journal");
1085            let archive = Archive::<_, FixedBytes<64>, _, _>::init(context, journal, cfg.clone())
1086                .await
1087                .expect("Failed to initialize archive");
1088
1089            // Check ranges again
1090            let (current_end, start_next) = archive.next_gap(0);
1091            assert!(current_end.is_none());
1092            assert_eq!(start_next.unwrap(), 1);
1093
1094            let (current_end, start_next) = archive.next_gap(1);
1095            assert_eq!(current_end.unwrap(), 1);
1096            assert_eq!(start_next.unwrap(), 10);
1097
1098            let (current_end, start_next) = archive.next_gap(10);
1099            assert_eq!(current_end.unwrap(), 11);
1100            assert_eq!(start_next.unwrap(), 14);
1101
1102            let (current_end, start_next) = archive.next_gap(11);
1103            assert_eq!(current_end.unwrap(), 11);
1104            assert_eq!(start_next.unwrap(), 14);
1105
1106            let (current_end, start_next) = archive.next_gap(12);
1107            assert!(current_end.is_none());
1108            assert_eq!(start_next.unwrap(), 14);
1109
1110            let (current_end, start_next) = archive.next_gap(14);
1111            assert_eq!(current_end.unwrap(), 14);
1112            assert!(start_next.is_none());
1113        });
1114    }
1115}