commonware_storage/archive/
mod.rs

1//! A write-once key-value store optimized for low-latency reads.
2//!
3//! `Archive` is a key-value store designed for workloads where all data is written only once and is
4//! uniquely associated with both an `index` and a `key`.
5//!
6//! Data is stored in `Journal` (an append-only log) and the location of written data is stored in-memory
7//! by both index and key (truncated representation using a caller-provided `Translator`) to
8//! enable **single-read lookups** for both query patterns over all archived data.
9//!
10//! _Notably, `Archive` does not make use of compaction nor on-disk indexes (and thus has no read nor
11//! write amplification during normal operation)._
12//!
13//! # Format
14//!
15//! `Archive` stores data in the following format:
16//!
17//! ```text
18//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
19//! | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |13 |14 |15 |16 |      ...      |
20//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
21//! |          Index(u64)           |  Key(Fixed Size)  |    C(u32)     |     Data      |
22//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
23//!
24//! C = CRC32(Key)
25//! ```
26//!
27//! _To ensure keys fetched using `Journal::get_prefix` are correctly read, the index and key are
28//! checksummed within a `Journal` entry (although the entire entry is also checksummed by `Journal`)._
29//!
30//! # Uniqueness
31//!
32//! `Archive` assumes all stored indexes and keys are unique. If the same key is associated with multiple
33//! `indices`, there is no guarantee which value will be returned. If the key is written to an existing `index`,
34//! `Archive` will return an error.
35//!
36//! ## Conflicts
37//!
38//! Because a truncated representation of a key is only ever stored in memory, it is possible (and expected)
39//! that two keys will eventually be represented by the same truncated key. To handle this case, `Archive`
40//! must check the persisted form of all conflicting keys to ensure data from the correct key is returned.
41//! To support efficient checks, `Archive` keeps a linked list of all keys with the same truncated prefix:
42//!
43//! ```rust
44//! struct Record {
45//!     index: u64,
46//!
47//!     next: Option<Box<Record>>,
48//! }
49//! ```
50//!
51//! _To avoid random memory reads in the common case, the in-memory index directly stores the first item
52//! in the linked list instead of a pointer to the first item._
53//!
54//! `index` is the key to the map used to serve lookups by `index` that stores the location of data in a given
55//! `Blob` (selected by `section = index & section_mask` to minimize the number of open `Journals`):
56//!
57//! ```rust
58//! struct Location {
59//!     offset: u32,
60//!     len: u32,
61//! }
62//! ```
63//!
64//! _If the `Translator` provided by the caller does not uniformly distribute keys across the key space or
65//! uses a truncated representation that means keys on average have many conflicts, performance will degrade._
66//!
67//! ## Memory Overhead
68//!
69//! `Archive` uses two maps to enable lookups by both index and key. The memory used to track each index
70//! item is `8 + 4 + 4` (where `8` is the index, `4` is the offset, and `4` is the length). The memory used to track
71//! each key item is `~truncated(key).len() + 16` bytes (where `16` is the size of the `Record` struct).
72//! This means that an `Archive` employing a `Translator` that uses the first `8` bytes of a key will use `~40` bytes
73//! to index each key.
74//!
75//! # Sync
76//!
77//! `Archive` flushes writes in a given `section` (computed by `index & section_mask`) to `Storage` after
78//! `pending_writes`. If the caller requires durability on a particular write, they can call `sync`.
79//!
80//! # Pruning
81//!
82//! `Archive` supports pruning up to a minimum `index` using the `prune` method. After `prune` is called
83//! on a `section`, all interaction with a `section` less than the pruned `section` will return an error.
84//!
85//! ## Lazy Index Cleanup
86//!
87//! Instead of performing a full iteration of the in-memory index, storing an additional in-memory index
88//! per `section`, or replaying a `section` of `Journal`, `Archive` lazily cleans up the in-memory index
89//! after pruning. When a new key is stored that overlaps (same truncated value) with a pruned key, the
90//! pruned key is removed from the in-memory index.
91//!
92//! # Single Operation Reads
93//!
94//! To enable single operation reads (i.e. reading all of an item in a single call to `Blob`), `Archive`
95//! caches the length of each item in its in-memory index. While it increases the footprint per key stored, the
96//! benefit of only ever performing a single operation to read a key (when there are no conflicts) is worth the
97//! tradeoff.
98//!
99//! # Compression
100//!
101//! `Archive` supports compressing data before storing it on disk. This can be enabled by setting the `compression`
102//! field in the `Config` struct to a valid `zstd` compression level. This setting can be changed between initializations
103//! of `Archive`, however, it must remain populated if any data was written with compression enabled.
104//!
105//! # Querying for Gaps
106//!
107//! `Archive` tracks gaps in the index space to enable the caller to efficiently fetch unknown keys using `next_gap`.
108//! This is a very common pattern when syncing blocks in a blockchain.
109//!
110//! # Example
111//!
112//! ```rust
113//! use commonware_runtime::{Spawner, Runner, deterministic::Executor};
114//! use commonware_storage::archive::{Archive, Config, translator::FourCap};
115//! use commonware_storage::journal::{Error, variable::{Config as JConfig, Journal}};
116//! use prometheus_client::registry::Registry;
117//! use std::sync::{Arc, Mutex};
118//!
119//! let (executor, context, _) = Executor::default();
120//! executor.start(async move {
121//!     // Create a journal
122//!     let cfg = JConfig {
123//!         registry: Arc::new(Mutex::new(Registry::default())),
124//!         partition: "partition".to_string()
125//!     };
126//!     let journal = Journal::init(context, cfg).await.unwrap();
127//!
128//!     // Create an archive
129//!     let cfg = Config {
130//!         registry: Arc::new(Mutex::new(Registry::default())),
131//!         key_len: 8,
132//!         translator: FourCap,
133//!         section_mask: 0xffff_ffff_ffff_0000u64,
134//!         pending_writes: 10,
135//!         replay_concurrency: 4,
136//!         compression: Some(3),
137//!     };
138//!     let mut archive = Archive::init(journal, cfg).await.unwrap();
139//!
140//!     // Put a key
141//!     archive.put(1, b"test-key", "data".into()).await.unwrap();
142//!
143//!     // Close the archive (also closes the journal)
144//!     archive.close().await.unwrap();
145//! });
146//! ```
147
148mod storage;
149pub use storage::{Archive, Identifier};
150pub mod translator;
151
152use prometheus_client::registry::Registry;
153use std::{
154    hash::Hash,
155    sync::{Arc, Mutex},
156};
157use thiserror::Error;
158
159/// Errors that can occur when interacting with the archive.
160#[derive(Debug, Error)]
161pub enum Error {
162    #[error("journal error: {0}")]
163    Journal(#[from] crate::journal::Error),
164    #[error("record corrupted")]
165    RecordCorrupted,
166    #[error("duplicate index")]
167    DuplicateIndex,
168    #[error("already pruned to: {0}")]
169    AlreadyPrunedTo(u64),
170    #[error("invalid key length")]
171    InvalidKeyLength,
172    #[error("record too large")]
173    RecordTooLarge,
174    #[error("compression failed")]
175    CompressionFailed,
176    #[error("decompression failed")]
177    DecompressionFailed,
178}
179
180/// Translate keys into an internal representation used in `Archive`'s
181/// in-memory index.
182///
183/// If invoking `transform` on keys results in many conflicts, the performance
184/// of `Archive` will degrade substantially.
185pub trait Translator: Clone {
186    type Key: Eq + Hash + Send + Sync + Clone;
187
188    /// Transform a key into its internal representation.
189    fn transform(&self, key: &[u8]) -> Self::Key;
190}
191
192/// Configuration for `Archive` storage.
193#[derive(Clone)]
194pub struct Config<T: Translator> {
195    /// Registry for metrics.
196    pub registry: Arc<Mutex<Registry>>,
197
198    /// Mask to apply to indices to determine section.
199    ///
200    /// This value is `index & section_mask`.
201    pub section_mask: u64,
202
203    /// Length of each key in bytes.
204    ///
205    /// `Archive` assumes that all keys are of the same length. This
206    /// trick is used to store data more efficiently on disk and to substantially
207    /// reduce the number of IO during initialization. If a key is provided that
208    /// is not of the correct length, an error will be returned.
209    pub key_len: u32,
210
211    /// Logic to transform keys into their index representation.
212    ///
213    /// `Archive` assumes that all internal keys are spread uniformly across the key space.
214    /// If that is not the case, lookups may be O(n) instead of O(1).
215    pub translator: T,
216
217    /// The number of writes to buffer in a section before forcing a sync in the journal.
218    ///
219    /// If set to 0, the journal will be synced each time a new item is stored.
220    pub pending_writes: usize,
221
222    /// The number of blobs to replay concurrently on initialization.
223    pub replay_concurrency: usize,
224
225    /// Optional compression level (using `zstd`) to apply to data before storing.
226    pub compression: Option<u8>,
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use crate::journal::variable::{Config as JConfig, Journal};
233    use crate::journal::Error as JournalError;
234    use bytes::Bytes;
235    use commonware_macros::test_traced;
236    use commonware_runtime::{deterministic::Executor, Blob, Runner, Storage};
237    use prometheus_client::{encoding::text::encode, registry::Registry};
238    use rand::Rng;
239    use std::{
240        collections::BTreeMap,
241        sync::{Arc, Mutex},
242    };
243    use translator::{FourCap, TwoCap};
244
245    const DEFAULT_SECTION_MASK: u64 = 0xffff_ffff_ffff_0000u64;
246
247    fn test_archive_put_get(compression: Option<u8>) {
248        // Initialize the deterministic runtime
249        let (executor, context, _) = Executor::default();
250        executor.start(async move {
251            // Create a registry for metrics
252            let registry = Arc::new(Mutex::new(Registry::default()));
253
254            // Initialize an empty journal
255            let journal = Journal::init(
256                context,
257                JConfig {
258                    registry: registry.clone(),
259                    partition: "test_partition".into(),
260                },
261            )
262            .await
263            .expect("Failed to initialize journal");
264
265            // Initialize the archive
266            let cfg = Config {
267                registry,
268                key_len: 7,
269                translator: FourCap,
270                pending_writes: 10,
271                replay_concurrency: 4,
272                compression,
273                section_mask: DEFAULT_SECTION_MASK,
274            };
275            let mut archive = Archive::init(journal, cfg.clone())
276                .await
277                .expect("Failed to initialize archive");
278
279            let index = 1u64;
280            let key = b"testkey";
281            let data = Bytes::from("testdata");
282
283            // Has the key
284            let has = archive
285                .has(Identifier::Index(index))
286                .await
287                .expect("Failed to check key");
288            assert!(!has);
289            let has = archive
290                .has(Identifier::Key(key))
291                .await
292                .expect("Failed to check key");
293            assert!(!has);
294
295            // Put the key-data pair
296            archive
297                .put(index, key, data.clone())
298                .await
299                .expect("Failed to put data");
300
301            // Has the key
302            let has = archive
303                .has(Identifier::Index(index))
304                .await
305                .expect("Failed to check key");
306            assert!(has);
307            let has = archive
308                .has(Identifier::Key(key))
309                .await
310                .expect("Failed to check key");
311            assert!(has);
312
313            // Get the data back
314            let retrieved = archive
315                .get(Identifier::Index(index))
316                .await
317                .expect("Failed to get data")
318                .expect("Data not found");
319            assert_eq!(retrieved, data);
320            let retrieved = archive
321                .get(Identifier::Key(key))
322                .await
323                .expect("Failed to get data")
324                .expect("Data not found");
325            assert_eq!(retrieved, data);
326
327            // Check metrics
328            let mut buffer = String::new();
329            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
330            assert!(buffer.contains("items_tracked 1"));
331            assert!(buffer.contains("unnecessary_reads_total 0"));
332            assert!(buffer.contains("gets_total 2"));
333            assert!(buffer.contains("has_total 4"));
334            assert!(buffer.contains("syncs_total 0"));
335
336            // Force a sync
337            archive.sync().await.expect("Failed to sync data");
338
339            // Check metrics
340            let mut buffer = String::new();
341            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
342            assert!(buffer.contains("items_tracked 1"));
343            assert!(buffer.contains("unnecessary_reads_total 0"));
344            assert!(buffer.contains("gets_total 2"));
345            assert!(buffer.contains("has_total 4"));
346            assert!(buffer.contains("syncs_total 1"));
347        });
348    }
349
350    #[test_traced]
351    fn test_archive_put_get_no_compression() {
352        test_archive_put_get(None);
353    }
354
355    #[test_traced]
356    fn test_archive_put_get_compression() {
357        test_archive_put_get(Some(3));
358    }
359
360    #[test_traced]
361    fn test_archive_compression_then_none() {
362        // Initialize the deterministic runtime
363        let (executor, context, _) = Executor::default();
364        executor.start(async move {
365            // Initialize an empty journal
366            let journal = Journal::init(
367                context.clone(),
368                JConfig {
369                    registry: Arc::new(Mutex::new(Registry::default())),
370                    partition: "test_partition".into(),
371                },
372            )
373            .await
374            .expect("Failed to initialize journal");
375
376            // Initialize the archive
377            let cfg = Config {
378                registry: Arc::new(Mutex::new(Registry::default())),
379                key_len: 7,
380                translator: FourCap,
381                pending_writes: 10,
382                replay_concurrency: 4,
383                compression: Some(3),
384                section_mask: DEFAULT_SECTION_MASK,
385            };
386            let mut archive = Archive::init(journal, cfg.clone())
387                .await
388                .expect("Failed to initialize archive");
389
390            // Put the key-data pair
391            let index = 1u64;
392            let key = b"testkey";
393            let data = Bytes::from("testdata");
394            archive
395                .put(index, key, data.clone())
396                .await
397                .expect("Failed to put data");
398
399            // Close the archive
400            archive.close().await.expect("Failed to close archive");
401
402            // Initialize the archive again without compression
403            let journal = Journal::init(
404                context,
405                JConfig {
406                    registry: Arc::new(Mutex::new(Registry::default())),
407                    partition: "test_partition".into(),
408                },
409            )
410            .await
411            .expect("Failed to initialize journal");
412            let cfg = Config {
413                registry: Arc::new(Mutex::new(Registry::default())),
414                key_len: 7,
415                translator: FourCap,
416                pending_writes: 10,
417                replay_concurrency: 4,
418                compression: None,
419                section_mask: DEFAULT_SECTION_MASK,
420            };
421            let archive = Archive::init(journal, cfg.clone())
422                .await
423                .expect("Failed to initialize archive");
424
425            // Get the data back
426            let retrieved = archive
427                .get(Identifier::Index(index))
428                .await
429                .expect("Failed to get data")
430                .expect("Data not found");
431            assert_ne!(retrieved, data);
432            let retrieved = archive
433                .get(Identifier::Key(key))
434                .await
435                .expect("Failed to get data")
436                .expect("Data not found");
437            assert_ne!(retrieved, data);
438        });
439    }
440
441    #[test_traced]
442    fn test_archive_invalid_key_length() {
443        // Initialize the deterministic runtime
444        let (executor, context, _) = Executor::default();
445        executor.start(async move {
446            // Create a registry for metrics
447            let registry = Arc::new(Mutex::new(Registry::default()));
448
449            // Initialize an empty journal
450            let journal = Journal::init(
451                context,
452                JConfig {
453                    registry: registry.clone(),
454                    partition: "test_partition".into(),
455                },
456            )
457            .await
458            .expect("Failed to initialize journal");
459
460            // Initialize the archive
461            let cfg = Config {
462                registry,
463                key_len: 8,
464                translator: FourCap,
465                pending_writes: 10,
466                replay_concurrency: 4,
467                compression: None,
468                section_mask: DEFAULT_SECTION_MASK,
469            };
470            let mut archive = Archive::init(journal, cfg.clone())
471                .await
472                .expect("Failed to initialize archive");
473
474            let index = 1u64;
475            let key = b"invalidkey";
476            let data = Bytes::from("invaliddata");
477
478            // Put the key-data pair
479            let result = archive.put(index, key, data).await;
480            assert!(matches!(result, Err(Error::InvalidKeyLength)));
481
482            // Get the data back
483            let result = archive.get(Identifier::Key(key)).await;
484            assert!(matches!(result, Err(Error::InvalidKeyLength)));
485
486            // Has the key
487            let result = archive.has(Identifier::Key(key)).await;
488            assert!(matches!(result, Err(Error::InvalidKeyLength)));
489
490            // Check metrics
491            let mut buffer = String::new();
492            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
493            assert!(buffer.contains("items_tracked 0"));
494            assert!(buffer.contains("unnecessary_reads_total 0"));
495            assert!(buffer.contains("gets_total 0"));
496        });
497    }
498
499    #[test_traced]
500    fn test_archive_record_corruption() {
501        // Initialize the deterministic runtime
502        let (executor, context, _) = Executor::default();
503        executor.start(async move {
504            // Initialize an empty journal
505            let journal = Journal::init(
506                context.clone(),
507                JConfig {
508                    registry: Arc::new(Mutex::new(Registry::default())),
509                    partition: "test_partition".into(),
510                },
511            )
512            .await
513            .expect("Failed to initialize journal");
514
515            // Initialize the archive
516            let cfg = Config {
517                registry: Arc::new(Mutex::new(Registry::default())),
518                key_len: 7,
519                translator: FourCap,
520                pending_writes: 10,
521                replay_concurrency: 4,
522                compression: None,
523                section_mask: DEFAULT_SECTION_MASK,
524            };
525            let mut archive = Archive::init(journal, cfg.clone())
526                .await
527                .expect("Failed to initialize archive");
528
529            let index = 1u64;
530            let key = b"testkey";
531            let data = Bytes::from("testdata");
532
533            // Put the key-data pair
534            archive
535                .put(index, key, data.clone())
536                .await
537                .expect("Failed to put data");
538
539            // Close the archive
540            archive.close().await.expect("Failed to close archive");
541
542            // Corrupt the value
543            let section = index & DEFAULT_SECTION_MASK;
544            let blob = context
545                .open("test_partition", &section.to_be_bytes())
546                .await
547                .unwrap();
548            let value_location = 4 + 8 + cfg.key_len as u64 + 4;
549            blob.write_at(b"testdaty", value_location).await.unwrap();
550            blob.close().await.unwrap();
551
552            // Initialize the archive again
553            let journal = Journal::init(
554                context,
555                JConfig {
556                    registry: Arc::new(Mutex::new(Registry::default())),
557                    partition: "test_partition".into(),
558                },
559            )
560            .await
561            .expect("Failed to initialize journal");
562            let archive = Archive::init(
563                journal,
564                Config {
565                    registry: Arc::new(Mutex::new(Registry::default())),
566                    key_len: 7,
567                    translator: FourCap,
568                    pending_writes: 10,
569                    replay_concurrency: 4,
570                    compression: None,
571                    section_mask: DEFAULT_SECTION_MASK,
572                },
573            )
574            .await
575            .expect("Failed to initialize archive");
576
577            // Attempt to get the key
578            let result = archive.get(Identifier::Key(key)).await;
579            assert!(matches!(
580                result,
581                Err(Error::Journal(JournalError::ChecksumMismatch(_, _)))
582            ));
583        });
584    }
585
586    #[test_traced]
587    fn test_archive_duplicate_key() {
588        // Initialize the deterministic runtime
589        let (executor, context, _) = Executor::default();
590        executor.start(async move {
591            // Create a registry for metrics
592            let registry = Arc::new(Mutex::new(Registry::default()));
593
594            // Initialize an empty journal
595            let journal = Journal::init(
596                context,
597                JConfig {
598                    registry: registry.clone(),
599                    partition: "test_partition".into(),
600                },
601            )
602            .await
603            .expect("Failed to initialize journal");
604
605            // Initialize the archive
606            let cfg = Config {
607                registry,
608                key_len: 9,
609                translator: FourCap,
610                pending_writes: 10,
611                replay_concurrency: 4,
612                compression: None,
613                section_mask: DEFAULT_SECTION_MASK,
614            };
615            let mut archive = Archive::init(journal, cfg.clone())
616                .await
617                .expect("Failed to initialize archive");
618
619            let index = 1u64;
620            let key = b"duplicate";
621            let data1 = Bytes::from("data1");
622            let data2 = Bytes::from("data2");
623
624            // Put the key-data pair
625            archive
626                .put(index, key, data1.clone())
627                .await
628                .expect("Failed to put data");
629
630            // Put the key-data pair again
631            let result = archive.put(index, key, data2.clone()).await;
632            assert!(matches!(result, Err(Error::DuplicateIndex)));
633
634            // Get the data back
635            let retrieved = archive
636                .get(Identifier::Index(index))
637                .await
638                .expect("Failed to get data")
639                .expect("Data not found");
640            assert_eq!(retrieved, data1);
641            let retrieved = archive
642                .get(Identifier::Key(key))
643                .await
644                .expect("Failed to get data")
645                .expect("Data not found");
646            assert_eq!(retrieved, data1);
647
648            // Check metrics
649            let mut buffer = String::new();
650            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
651            assert!(buffer.contains("items_tracked 1"));
652            assert!(buffer.contains("unnecessary_reads_total 0"));
653            assert!(buffer.contains("gets_total 2"));
654        });
655    }
656
657    #[test_traced]
658    fn test_archive_get_nonexistent() {
659        // Initialize the deterministic runtime
660        let (executor, context, _) = Executor::default();
661        executor.start(async move {
662            // Create a registry for metrics
663            let registry = Arc::new(Mutex::new(Registry::default()));
664
665            // Initialize an empty journal
666            let journal = Journal::init(
667                context,
668                JConfig {
669                    registry: registry.clone(),
670                    partition: "test_partition".into(),
671                },
672            )
673            .await
674            .expect("Failed to initialize journal");
675
676            // Initialize the archive
677            let cfg = Config {
678                registry,
679                key_len: 11,
680                translator: FourCap,
681                pending_writes: 10,
682                replay_concurrency: 4,
683                compression: None,
684                section_mask: DEFAULT_SECTION_MASK,
685            };
686            let archive = Archive::init(journal, cfg.clone())
687                .await
688                .expect("Failed to initialize archive");
689
690            // Attempt to get an index that doesn't exist
691            let index = 1u64;
692            let retrieved = archive
693                .get(Identifier::Index(index))
694                .await
695                .expect("Failed to get data");
696            assert!(retrieved.is_none());
697
698            // Attempt to get a key that doesn't exist
699            let key = b"nonexistent";
700            let retrieved = archive
701                .get(Identifier::Key(key))
702                .await
703                .expect("Failed to get data");
704            assert!(retrieved.is_none());
705
706            // Check metrics
707            let mut buffer = String::new();
708            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
709            assert!(buffer.contains("items_tracked 0"));
710            assert!(buffer.contains("unnecessary_reads_total 0"));
711            assert!(buffer.contains("gets_total 2"));
712        });
713    }
714
715    #[test_traced]
716    fn test_archive_overlapping_key() {
717        // Initialize the deterministic runtime
718        let (executor, context, _) = Executor::default();
719        executor.start(async move {
720            // Create a registry for metrics
721            let registry = Arc::new(Mutex::new(Registry::default()));
722
723            // Initialize an empty journal
724            let journal = Journal::init(
725                context,
726                JConfig {
727                    registry: registry.clone(),
728                    partition: "test_partition".into(),
729                },
730            )
731            .await
732            .expect("Failed to initialize journal");
733
734            // Initialize the archive
735            let cfg = Config {
736                registry,
737                key_len: 5,
738                translator: FourCap,
739                pending_writes: 10,
740                replay_concurrency: 4,
741                compression: None,
742                section_mask: DEFAULT_SECTION_MASK,
743            };
744            let mut archive = Archive::init(journal, cfg.clone())
745                .await
746                .expect("Failed to initialize archive");
747
748            let index1 = 1u64;
749            let key1 = b"keys1";
750            let data1 = Bytes::from("data1");
751            let index2 = 2u64;
752            let key2 = b"keys2";
753            let data2 = Bytes::from("data2");
754
755            // Put the key-data pair
756            archive
757                .put(index1, key1, data1.clone())
758                .await
759                .expect("Failed to put data");
760
761            // Put the key-data pair
762            archive
763                .put(index2, key2, data2.clone())
764                .await
765                .expect("Failed to put data");
766
767            // Get the data back
768            let retrieved = archive
769                .get(Identifier::Key(key1))
770                .await
771                .expect("Failed to get data")
772                .expect("Data not found");
773            assert_eq!(retrieved, data1);
774
775            // Get the data back
776            let retrieved = archive
777                .get(Identifier::Key(key2))
778                .await
779                .expect("Failed to get data")
780                .expect("Data not found");
781            assert_eq!(retrieved, data2);
782
783            // Check metrics
784            let mut buffer = String::new();
785            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
786            assert!(buffer.contains("items_tracked 2"));
787            assert!(buffer.contains("unnecessary_reads_total 1"));
788            assert!(buffer.contains("gets_total 2"));
789        });
790    }
791
792    #[test_traced]
793    fn test_archive_overlapping_key_multiple_sections() {
794        // Initialize the deterministic runtime
795        let (executor, context, _) = Executor::default();
796        executor.start(async move {
797            // Create a registry for metrics
798            let registry = Arc::new(Mutex::new(Registry::default()));
799
800            // Initialize an empty journal
801            let journal = Journal::init(
802                context,
803                JConfig {
804                    registry: registry.clone(),
805                    partition: "test_partition".into(),
806                },
807            )
808            .await
809            .expect("Failed to initialize journal");
810
811            // Initialize the archive
812            let cfg = Config {
813                registry,
814                key_len: 5,
815                translator: FourCap,
816                pending_writes: 10,
817                replay_concurrency: 4,
818                compression: None,
819                section_mask: DEFAULT_SECTION_MASK,
820            };
821            let mut archive = Archive::init(journal, cfg.clone())
822                .await
823                .expect("Failed to initialize archive");
824
825            let index1 = 1u64;
826            let key1 = b"keys1";
827            let data1 = Bytes::from("data1");
828            let index2 = 2_000_000u64;
829            let key2 = b"keys2";
830            let data2 = Bytes::from("data2");
831
832            // Put the key-data pair
833            archive
834                .put(index1, key1, data1.clone())
835                .await
836                .expect("Failed to put data");
837
838            // Put the key-data pair
839            archive
840                .put(index2, key2, data2.clone())
841                .await
842                .expect("Failed to put data");
843
844            // Get the data back
845            let retrieved = archive
846                .get(Identifier::Key(key1))
847                .await
848                .expect("Failed to get data")
849                .expect("Data not found");
850            assert_eq!(retrieved, data1);
851
852            // Get the data back
853            let retrieved = archive
854                .get(Identifier::Key(key2))
855                .await
856                .expect("Failed to get data")
857                .expect("Data not found");
858            assert_eq!(retrieved, data2);
859        });
860    }
861
862    #[test_traced]
863    fn test_archive_prune_keys() {
864        // Initialize the deterministic runtime
865        let (executor, context, _) = Executor::default();
866        executor.start(async move {
867            // Create a registry for metrics
868            let registry = Arc::new(Mutex::new(Registry::default()));
869
870            // Initialize an empty journal
871            let journal = Journal::init(
872                context.clone(),
873                JConfig {
874                    registry: registry.clone(),
875                    partition: "test_partition".into(),
876                },
877            )
878            .await
879            .expect("Failed to initialize journal");
880
881            // Initialize the archive
882            let cfg = Config {
883                registry: registry.clone(),
884                key_len: 9,
885                translator: FourCap,
886                pending_writes: 10,
887                replay_concurrency: 4,
888                compression: None,
889                section_mask: 0xffff_ffff_ffff_ffffu64, // no mask
890            };
891            let mut archive = Archive::init(journal, cfg.clone())
892                .await
893                .expect("Failed to initialize archive");
894
895            // Insert multiple keys across different sections
896            let keys = vec![
897                (1u64, "key1-blah", Bytes::from("data1")),
898                (2u64, "key2-blah", Bytes::from("data2")),
899                (3u64, "key3-blah", Bytes::from("data3")),
900                (4u64, "key3-bleh", Bytes::from("data3-again")),
901                (5u64, "key4-blah", Bytes::from("data4")),
902            ];
903
904            for (index, key, data) in &keys {
905                archive
906                    .put(*index, key.as_bytes(), data.clone())
907                    .await
908                    .expect("Failed to put data");
909            }
910
911            // Check metrics
912            let mut buffer = String::new();
913            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
914            assert!(buffer.contains("items_tracked 5"));
915
916            // Prune sections less than 3
917            archive.prune(3).await.expect("Failed to prune");
918
919            // Ensure keys 1 and 2 are no longer present
920            for (index, key, data) in keys {
921                let retrieved = archive
922                    .get(Identifier::Key(key.as_bytes()))
923                    .await
924                    .expect("Failed to get data");
925                if index < 3 {
926                    assert!(retrieved.is_none());
927                } else {
928                    assert_eq!(retrieved.expect("Data not found"), data);
929                }
930            }
931
932            // Check metrics
933            let mut buffer = String::new();
934            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
935            assert!(buffer.contains("items_tracked 3"));
936            assert!(buffer.contains("indices_pruned_total 2"));
937            assert!(buffer.contains("keys_pruned_total 0")); // no lazy cleanup yet
938
939            // Try to prune older section
940            archive.prune(2).await.expect("Failed to prune");
941
942            // Try to prune current section again
943            archive.prune(3).await.expect("Failed to prune");
944
945            // Try to put older index
946            let result = archive
947                .put(1, "key1-blah".as_bytes(), Bytes::from("data1"))
948                .await;
949            assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
950
951            // Trigger lazy removal of keys
952            archive
953                .put(6, "key2-blfh".as_bytes(), Bytes::from("data2-2"))
954                .await
955                .expect("Failed to put data");
956
957            // Check metrics
958            let mut buffer = String::new();
959            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
960            assert!(buffer.contains("items_tracked 4")); // lazily remove one, add one
961            assert!(buffer.contains("indices_pruned_total 2"));
962            assert!(buffer.contains("keys_pruned_total 1"));
963        });
964    }
965
966    fn test_archive_keys_and_restart(num_keys: usize) -> String {
967        // Initialize the deterministic runtime
968        let (executor, mut context, auditor) = Executor::default();
969        executor.start(async move {
970            // Create a registry for metrics
971            let registry = Arc::new(Mutex::new(Registry::default()));
972
973            // Initialize an empty journal
974            let journal = Journal::init(
975                context.clone(),
976                JConfig {
977                    registry: registry.clone(),
978                    partition: "test_partition".into(),
979                },
980            )
981            .await
982            .expect("Failed to initialize journal");
983
984            // Initialize the archive
985            let section_mask = 0xffff_ffff_ffff_ff00u64;
986            let cfg = Config {
987                registry: registry.clone(),
988                key_len: 32,
989                translator: TwoCap,
990                pending_writes: 10,
991                replay_concurrency: 4,
992                compression: None,
993                section_mask,
994            };
995            let mut archive = Archive::init(journal, cfg.clone())
996                .await
997                .expect("Failed to initialize archive");
998
999            // Insert multiple keys across different sections
1000            let mut keys = BTreeMap::new();
1001            while keys.len() < num_keys {
1002                let index = keys.len() as u64;
1003                let mut key = [0u8; 32];
1004                context.fill(&mut key);
1005                let mut data = [0u8; 1024];
1006                context.fill(&mut data);
1007                let data = Bytes::from(data.to_vec());
1008                archive
1009                    .put(index, &key, data.clone())
1010                    .await
1011                    .expect("Failed to put data");
1012                keys.insert(key, (index, data));
1013            }
1014
1015            // Ensure all keys can be retrieved
1016            for (key, (index, data)) in &keys {
1017                let retrieved = archive
1018                    .get(Identifier::Index(*index))
1019                    .await
1020                    .expect("Failed to get data")
1021                    .expect("Data not found");
1022                assert_eq!(retrieved, data);
1023                let retrieved = archive
1024                    .get(Identifier::Key(key))
1025                    .await
1026                    .expect("Failed to get data")
1027                    .expect("Data not found");
1028                assert_eq!(retrieved, data);
1029            }
1030
1031            // Check metrics
1032            let mut buffer = String::new();
1033            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
1034            let tracked = format!("items_tracked {:?}", num_keys);
1035            assert!(buffer.contains(&tracked));
1036            assert!(buffer.contains("keys_pruned_total 0"));
1037
1038            // Close the archive
1039            archive.close().await.expect("Failed to close archive");
1040
1041            // Reinitialize the archive
1042            let registry = Arc::new(Mutex::new(Registry::default()));
1043            let journal = Journal::init(
1044                context.clone(),
1045                JConfig {
1046                    registry: registry.clone(),
1047                    partition: "test_partition".into(),
1048                },
1049            )
1050            .await
1051            .expect("Failed to initialize journal");
1052            let cfg = Config {
1053                registry: registry.clone(),
1054                key_len: 32,
1055                translator: TwoCap,
1056                pending_writes: 10,
1057                replay_concurrency: 4,
1058                compression: None,
1059                section_mask,
1060            };
1061            let mut archive = Archive::init(journal, cfg.clone())
1062                .await
1063                .expect("Failed to initialize archive");
1064
1065            // Ensure all keys can be retrieved
1066            for (key, (index, data)) in &keys {
1067                let retrieved = archive
1068                    .get(Identifier::Index(*index))
1069                    .await
1070                    .expect("Failed to get data")
1071                    .expect("Data not found");
1072                assert_eq!(retrieved, data);
1073                let retrieved = archive
1074                    .get(Identifier::Key(key))
1075                    .await
1076                    .expect("Failed to get data")
1077                    .expect("Data not found");
1078                assert_eq!(retrieved, data);
1079            }
1080
1081            // Prune first half
1082            let min = (keys.len() / 2) as u64;
1083            archive.prune(min).await.expect("Failed to prune");
1084
1085            // Ensure all keys can be retrieved that haven't been pruned
1086            let min = min & section_mask;
1087            let mut removed = 0;
1088            for (key, (index, data)) in keys {
1089                if index >= min {
1090                    let retrieved = archive
1091                        .get(Identifier::Key(&key))
1092                        .await
1093                        .expect("Failed to get data")
1094                        .expect("Data not found");
1095                    assert_eq!(retrieved, data);
1096
1097                    // Check range
1098                    let (current_end, start_next) = archive.next_gap(index);
1099                    assert_eq!(current_end.unwrap(), num_keys as u64 - 1);
1100                    assert!(start_next.is_none());
1101                } else {
1102                    let retrieved = archive
1103                        .get(Identifier::Key(&key))
1104                        .await
1105                        .expect("Failed to get data");
1106                    assert!(retrieved.is_none());
1107                    removed += 1;
1108
1109                    // Check range
1110                    let (current_end, start_next) = archive.next_gap(index);
1111                    assert!(current_end.is_none());
1112                    assert_eq!(start_next.unwrap(), min);
1113                }
1114            }
1115
1116            // Check metrics
1117            let mut buffer = String::new();
1118            encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
1119            let tracked = format!("items_tracked {:?}", num_keys - removed);
1120            assert!(buffer.contains(&tracked));
1121            let pruned = format!("indices_pruned_total {}", removed);
1122            assert!(buffer.contains(&pruned));
1123            assert!(buffer.contains("keys_pruned_total 0")); // have not lazily removed keys yet
1124        });
1125        auditor.state()
1126    }
1127
1128    #[test_traced]
1129    fn test_archive_many_keys_and_restart() {
1130        test_archive_keys_and_restart(100_000); // 391 sections
1131    }
1132
1133    #[test_traced]
1134    fn test_determinism() {
1135        let state1 = test_archive_keys_and_restart(5_000); // 20 sections
1136        let state2 = test_archive_keys_and_restart(5_000);
1137        assert_eq!(state1, state2);
1138    }
1139
1140    #[test_traced]
1141    fn test_ranges() {
1142        // Initialize the deterministic runtime
1143        let (executor, context, _) = Executor::default();
1144        executor.start(async move {
1145            // Create a registry for metrics
1146            let registry = Arc::new(Mutex::new(Registry::default()));
1147
1148            // Initialize an empty journal
1149            let journal = Journal::init(
1150                context.clone(),
1151                JConfig {
1152                    registry: registry.clone(),
1153                    partition: "test_partition".into(),
1154                },
1155            )
1156            .await
1157            .expect("Failed to initialize journal");
1158
1159            // Initialize the archive
1160            let cfg = Config {
1161                registry,
1162                key_len: 9,
1163                translator: FourCap,
1164                pending_writes: 10,
1165                replay_concurrency: 4,
1166                compression: None,
1167                section_mask: DEFAULT_SECTION_MASK,
1168            };
1169            let mut archive = Archive::init(journal, cfg.clone())
1170                .await
1171                .expect("Failed to initialize archive");
1172
1173            // Insert multiple keys across different indices
1174            let keys = vec![
1175                (1u64, "key1-blah", Bytes::from("data1")),
1176                (10u64, "key2-blah", Bytes::from("data2")),
1177                (11u64, "key3-blah", Bytes::from("data3")),
1178                (14u64, "key3-bleh", Bytes::from("data3-again")),
1179            ];
1180            for (index, key, data) in &keys {
1181                archive
1182                    .put(*index, key.as_bytes(), data.clone())
1183                    .await
1184                    .expect("Failed to put data");
1185            }
1186
1187            // Check ranges
1188            let (current_end, start_next) = archive.next_gap(0);
1189            assert!(current_end.is_none());
1190            assert_eq!(start_next.unwrap(), 1);
1191
1192            let (current_end, start_next) = archive.next_gap(1);
1193            assert_eq!(current_end.unwrap(), 1);
1194            assert_eq!(start_next.unwrap(), 10);
1195
1196            let (current_end, start_next) = archive.next_gap(10);
1197            assert_eq!(current_end.unwrap(), 11);
1198            assert_eq!(start_next.unwrap(), 14);
1199
1200            let (current_end, start_next) = archive.next_gap(11);
1201            assert_eq!(current_end.unwrap(), 11);
1202            assert_eq!(start_next.unwrap(), 14);
1203
1204            let (current_end, start_next) = archive.next_gap(12);
1205            assert!(current_end.is_none());
1206            assert_eq!(start_next.unwrap(), 14);
1207
1208            let (current_end, start_next) = archive.next_gap(14);
1209            assert_eq!(current_end.unwrap(), 14);
1210            assert!(start_next.is_none());
1211
1212            // Close and check again
1213            archive.close().await.expect("Failed to close archive");
1214
1215            let journal = Journal::init(
1216                context,
1217                JConfig {
1218                    registry: Arc::new(Mutex::new(Registry::default())),
1219                    partition: "test_partition".into(),
1220                },
1221            )
1222            .await
1223            .expect("Failed to initialize journal");
1224            let archive = Archive::init(journal, cfg.clone())
1225                .await
1226                .expect("Failed to initialize archive");
1227
1228            // Check ranges again
1229            let (current_end, start_next) = archive.next_gap(0);
1230            assert!(current_end.is_none());
1231            assert_eq!(start_next.unwrap(), 1);
1232
1233            let (current_end, start_next) = archive.next_gap(1);
1234            assert_eq!(current_end.unwrap(), 1);
1235            assert_eq!(start_next.unwrap(), 10);
1236
1237            let (current_end, start_next) = archive.next_gap(10);
1238            assert_eq!(current_end.unwrap(), 11);
1239            assert_eq!(start_next.unwrap(), 14);
1240
1241            let (current_end, start_next) = archive.next_gap(11);
1242            assert_eq!(current_end.unwrap(), 11);
1243            assert_eq!(start_next.unwrap(), 14);
1244
1245            let (current_end, start_next) = archive.next_gap(12);
1246            assert!(current_end.is_none());
1247            assert_eq!(start_next.unwrap(), 14);
1248
1249            let (current_end, start_next) = archive.next_gap(14);
1250            assert_eq!(current_end.unwrap(), 14);
1251            assert!(start_next.is_none());
1252        });
1253    }
1254}