Skip to main content

commonware_storage/archive/prunable/
mod.rs

1//! A prunable key-value store for ordered data.
2//!
3//! Data is stored across two backends: [crate::journal::segmented::fixed] for fixed-size index entries and
4//! [crate::journal::segmented::glob::Glob] for values (managed by [crate::journal::segmented::oversized]).
5//! The location of written data is stored in-memory by both index and key (via [crate::index::unordered::Index])
6//! to enable efficient lookups (on average).
7//!
8//! _Notably, [Archive] does not make use of compaction nor on-disk indexes (and thus has no read
9//! nor write amplification during normal operation).
10//!
11//! # Format
12//!
13//! [Archive] uses a two-journal structure for efficient page cache usage:
14//!
15//! **Index Journal (segmented/fixed)** - Fixed-size entries for fast startup replay:
16//! ```text
17//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
18//! | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |13 |14 |15 |16 |17 |18 |19 |20 |21 |22 |23 |
19//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
20//! |          Index(u64)           |Key(Fixed Size)|        val_offset(u64)        | val_size(u32) |
21//! +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
22//! ```
23//!
24//! **Value Blob** - Raw values with CRC32 checksums (direct reads, no page cache):
25//! ```text
26//! +---+---+---+---+---+---+---+---+---+---+---+---+
27//! |     Compressed Data (variable)    |   CRC32   |
28//! +---+---+---+---+---+---+---+---+---+---+---+---+
29//! ```
30//!
31//! # Uniqueness
32//!
33//! [Archive] assumes all stored indexes and keys are unique. If the same key is associated with
34//! multiple `indices`, there is no guarantee which value will be returned. If the key is written to
35//! an existing `index`, [Archive] will return an error.
36//!
37//! ## Conflicts
38//!
39//! Because a translated representation of a key is only ever stored in memory, it is possible (and
40//! expected) that two keys will eventually be represented by the same translated key. To handle
41//! this case, [Archive] must check the persisted form of all conflicting keys to ensure data from
42//! the correct key is returned. To support efficient checks, [Archive] (via
43//! [crate::index::unordered::Index]) keeps a linked list of all keys with the same translated
44//! prefix:
45//!
46//! ```rust
47//! struct Record {
48//!     index: u64,
49//!
50//!     next: Option<Box<Record>>,
51//! }
52//! ```
53//!
54//! _To avoid random memory reads in the common case, the in-memory index directly stores the first
55//! item in the linked list instead of a pointer to the first item._
56//!
57//! `index` is the key to the map used to serve lookups by `index` that stores the position in the
58//! index journal (selected by `section = index / items_per_section * items_per_section` to minimize
59//! the number of open blobs):
60//!
61//! ```text
62//! // Maps index -> position in index journal
63//! indices: BTreeMap<u64, u64>
64//! ```
65//!
66//! _If the [Translator] provided by the caller does not uniformly distribute keys across the key
67//! space or uses a translated representation that means keys on average have many conflicts,
68//! performance will degrade._
69//!
70//! ## Memory Overhead
71//!
72//! [Archive] uses two maps to enable lookups by both index and key. The memory used to track each
73//! index item is `8 + 8` (where `8` is the index and `8` is the position in the index journal).
74//! The memory used to track each key item is `~translated(key).len() + 16` bytes (where `16` is the
75//! size of the `Record` struct). This means that an [Archive] employing a [Translator] that uses
76//! the first `8` bytes of a key will use `~40` bytes to index each key.
77//!
78//! # Pruning
79//!
80//! [Archive] supports pruning up to a minimum `index` using the `prune` method. After `prune` is
81//! called on a `section`, all interaction with a `section` less than the pruned `section` will
82//! return an error.
83//!
84//! ## Lazy Index Cleanup
85//!
86//! Instead of performing a full iteration of the in-memory index, storing an additional in-memory
87//! index per `section`, or replaying a `section` of the value blob,
88//! [Archive] lazily cleans up the [crate::index::unordered::Index] after pruning. When a new key is
89//! stored that overlaps (same translated value) with a pruned key, the pruned key is removed from
90//! the in-memory index.
91//!
92//! # Read Path
93//!
94//! All reads (by index or key) first read the index entry from the index journal to get the
95//! value location (offset and size), then read the value from the value blob. The index journal
96//! uses a page cache for caching, so hot entries are served from memory. Values are read directly
97//! from disk without caching to avoid polluting the page cache with large values.
98//!
99//! # Compression
100//!
101//! [Archive] supports compressing data before storing it on disk. This can be enabled by setting
102//! the `compression` field in the `Config` struct to a valid `zstd` compression level. This setting
103//! can be changed between initializations of [Archive], however, it must remain populated if any
104//! data was written with compression enabled.
105//!
106//! # Querying for Gaps
107//!
108//! [Archive] tracks gaps in the index space to enable the caller to efficiently fetch unknown keys
109//! using `next_gap`. This is a very common pattern when syncing blocks in a blockchain.
110//!
111//! # Example
112//!
113//! ```rust
114//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
115//! use commonware_cryptography::{Hasher as _, Sha256};
116//! use commonware_storage::{
117//!     translator::FourCap,
118//!     archive::{
119//!         Archive as _,
120//!         prunable::{Archive, Config},
121//!     },
122//! };
123//! use commonware_utils::{NZUsize, NZU16, NZU64};
124//!
125//! let executor = deterministic::Runner::default();
126//! executor.start(|context| async move {
127//!     // Create an archive
128//!     let cfg = Config {
129//!         translator: FourCap,
130//!         key_partition: "demo_index".into(),
131//!         key_page_cache: CacheRef::new(NZU16!(1024), NZUsize!(10)),
132//!         value_partition: "demo_value".into(),
133//!         compression: Some(3),
134//!         codec_config: (),
135//!         items_per_section: NZU64!(1024),
136//!         key_write_buffer: NZUsize!(1024 * 1024),
137//!         value_write_buffer: NZUsize!(1024 * 1024),
138//!         replay_buffer: NZUsize!(4096),
139//!     };
140//!     let mut archive = Archive::init(context, cfg).await.unwrap();
141//!
142//!     // Put a key
143//!     archive.put(1, Sha256::hash(b"data"), 10).await.unwrap();
144//!
145//!     // Sync the archive
146//!     archive.sync().await.unwrap();
147//! });
148//! ```
149
150use crate::translator::Translator;
151use commonware_runtime::buffer::paged::CacheRef;
152use std::num::{NonZeroU64, NonZeroUsize};
153
154mod storage;
155pub use storage::Archive;
156
157/// Configuration for [Archive] storage.
158#[derive(Clone)]
159pub struct Config<T: Translator, C> {
160    /// Logic to transform keys into their index representation.
161    ///
162    /// [Archive] assumes that all internal keys are spread uniformly across the key space.
163    /// If that is not the case, lookups may be O(n) instead of O(1).
164    pub translator: T,
165
166    /// The partition to use for the key journal (stores index+key metadata).
167    pub key_partition: String,
168
169    /// The page cache to use for the key journal.
170    pub key_page_cache: CacheRef,
171
172    /// The partition to use for the value blob (stores values).
173    pub value_partition: String,
174
175    /// The compression level to use for the value blob.
176    pub compression: Option<u8>,
177
178    /// The [commonware_codec::Codec] configuration to use for the value stored in the archive.
179    pub codec_config: C,
180
181    /// The number of items per section (the granularity of pruning).
182    pub items_per_section: NonZeroU64,
183
184    /// The amount of bytes that can be buffered for the key journal before being written to a
185    /// [commonware_runtime::Blob].
186    pub key_write_buffer: NonZeroUsize,
187
188    /// The amount of bytes that can be buffered for the value journal before being written to a
189    /// [commonware_runtime::Blob].
190    pub value_write_buffer: NonZeroUsize,
191
192    /// The buffer size to use when replaying a [commonware_runtime::Blob].
193    pub replay_buffer: NonZeroUsize,
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use crate::{
200        archive::{Archive as _, Error, Identifier},
201        journal::Error as JournalError,
202        kv::tests::test_key,
203        translator::{FourCap, TwoCap},
204    };
205    use commonware_codec::{DecodeExt, Error as CodecError};
206    use commonware_macros::{test_group, test_traced};
207    use commonware_runtime::{deterministic, Metrics, Runner};
208    use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
209    use rand::Rng;
210    use std::{collections::BTreeMap, num::NonZeroU16};
211
212    const DEFAULT_ITEMS_PER_SECTION: u64 = 65536;
213    const DEFAULT_WRITE_BUFFER: usize = 1024;
214    const DEFAULT_REPLAY_BUFFER: usize = 4096;
215    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
216    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
217
218    #[test_traced]
219    fn test_archive_compression_then_none() {
220        // Initialize the deterministic context
221        let executor = deterministic::Runner::default();
222        executor.start(|context| async move {
223            // Initialize the archive
224            let cfg = Config {
225                translator: FourCap,
226                key_partition: "test_index".into(),
227                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
228                value_partition: "test_value".into(),
229                codec_config: (),
230                compression: Some(3),
231                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
232                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
233                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
234                items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
235            };
236            let mut archive = Archive::init(context.with_label("first"), cfg.clone())
237                .await
238                .expect("Failed to initialize archive");
239
240            // Put the key-data pair
241            let index = 1u64;
242            let key = test_key("testkey");
243            let data = 1;
244            archive
245                .put(index, key.clone(), data)
246                .await
247                .expect("Failed to put data");
248
249            // Sync and drop the archive
250            archive.sync().await.expect("Failed to sync archive");
251            drop(archive);
252
253            // Initialize the archive again without compression.
254            // Index journal replay succeeds (no compression), but value reads will fail.
255            let cfg = Config {
256                translator: FourCap,
257                key_partition: "test_index".into(),
258                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
259                value_partition: "test_value".into(),
260                codec_config: (),
261                compression: None,
262                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
263                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
264                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
265                items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
266            };
267            let archive = Archive::<_, _, FixedBytes<64>, i32>::init(
268                context.with_label("second"),
269                cfg.clone(),
270            )
271            .await
272            .unwrap();
273
274            // Getting the value should fail because compression settings mismatch.
275            // Without compression, the codec sees extra bytes after decoding the value
276            // (because the compressed data doesn't match the expected format).
277            let result: Result<Option<i32>, _> = archive.get(Identifier::Index(index)).await;
278            assert!(matches!(
279                result,
280                Err(Error::Journal(JournalError::Codec(CodecError::ExtraData(
281                    _
282                ))))
283            ));
284        });
285    }
286
287    #[test_traced]
288    fn test_archive_overlapping_key_basic() {
289        // Initialize the deterministic context
290        let executor = deterministic::Runner::default();
291        executor.start(|context| async move {
292            // Initialize the archive
293            let cfg = Config {
294                translator: FourCap,
295                key_partition: "test_index".into(),
296                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
297                value_partition: "test_value".into(),
298                codec_config: (),
299                compression: None,
300                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
301                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
302                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
303                items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
304            };
305            let mut archive = Archive::init(context.clone(), cfg.clone())
306                .await
307                .expect("Failed to initialize archive");
308
309            let index1 = 1u64;
310            let key1 = test_key("keys1");
311            let data1 = 1;
312            let index2 = 2u64;
313            let key2 = test_key("keys2");
314            let data2 = 2;
315
316            // Put the key-data pair
317            archive
318                .put(index1, key1.clone(), data1)
319                .await
320                .expect("Failed to put data");
321
322            // Put the key-data pair
323            archive
324                .put(index2, key2.clone(), data2)
325                .await
326                .expect("Failed to put data");
327
328            // Get the data back
329            let retrieved = archive
330                .get(Identifier::Key(&key1))
331                .await
332                .expect("Failed to get data")
333                .expect("Data not found");
334            assert_eq!(retrieved, data1);
335
336            // Get the data back
337            let retrieved = archive
338                .get(Identifier::Key(&key2))
339                .await
340                .expect("Failed to get data")
341                .expect("Data not found");
342            assert_eq!(retrieved, data2);
343
344            // Check metrics
345            let buffer = context.encode();
346            assert!(buffer.contains("items_tracked 2"));
347            assert!(buffer.contains("unnecessary_reads_total 1"));
348            assert!(buffer.contains("gets_total 2"));
349        });
350    }
351
352    #[test_traced]
353    fn test_archive_overlapping_key_multiple_sections() {
354        // Initialize the deterministic context
355        let executor = deterministic::Runner::default();
356        executor.start(|context| async move {
357            // Initialize the archive
358            let cfg = Config {
359                translator: FourCap,
360                key_partition: "test_index".into(),
361                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
362                value_partition: "test_value".into(),
363                codec_config: (),
364                compression: None,
365                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
366                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
367                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
368                items_per_section: NZU64!(DEFAULT_ITEMS_PER_SECTION),
369            };
370            let mut archive = Archive::init(context.clone(), cfg.clone())
371                .await
372                .expect("Failed to initialize archive");
373
374            let index1 = 1u64;
375            let key1 = test_key("keys1");
376            let data1 = 1;
377            let index2 = 2_000_000u64;
378            let key2 = test_key("keys2");
379            let data2 = 2;
380
381            // Put the key-data pair
382            archive
383                .put(index1, key1.clone(), data1)
384                .await
385                .expect("Failed to put data");
386
387            // Put the key-data pair
388            archive
389                .put(index2, key2.clone(), data2)
390                .await
391                .expect("Failed to put data");
392
393            // Get the data back
394            let retrieved = archive
395                .get(Identifier::Key(&key1))
396                .await
397                .expect("Failed to get data")
398                .expect("Data not found");
399            assert_eq!(retrieved, data1);
400
401            // Get the data back
402            let retrieved = archive
403                .get(Identifier::Key(&key2))
404                .await
405                .expect("Failed to get data")
406                .expect("Data not found");
407            assert_eq!(retrieved, data2);
408        });
409    }
410
411    #[test_traced]
412    fn test_archive_prune_keys() {
413        // Initialize the deterministic context
414        let executor = deterministic::Runner::default();
415        executor.start(|context| async move {
416            // Initialize the archive
417            let cfg = Config {
418                translator: FourCap,
419                key_partition: "test_index".into(),
420                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
421                value_partition: "test_value".into(),
422                codec_config: (),
423                compression: None,
424                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
425                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
426                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
427                items_per_section: NZU64!(1), // no mask - each item is its own section
428            };
429            let mut archive = Archive::init(context.clone(), cfg.clone())
430                .await
431                .expect("Failed to initialize archive");
432
433            // Insert multiple keys across different sections
434            let keys = vec![
435                (1u64, test_key("key1-blah"), 1),
436                (2u64, test_key("key2-blah"), 2),
437                (3u64, test_key("key3-blah"), 3),
438                (4u64, test_key("key3-bleh"), 3),
439                (5u64, test_key("key4-blah"), 4),
440            ];
441
442            for (index, key, data) in &keys {
443                archive
444                    .put(*index, key.clone(), *data)
445                    .await
446                    .expect("Failed to put data");
447            }
448
449            // Check metrics
450            let buffer = context.encode();
451            assert!(buffer.contains("items_tracked 5"));
452
453            // Prune sections less than 3
454            archive.prune(3).await.expect("Failed to prune");
455
456            // Ensure keys 1 and 2 are no longer present
457            for (index, key, data) in keys {
458                let retrieved = archive
459                    .get(Identifier::Key(&key))
460                    .await
461                    .expect("Failed to get data");
462                if index < 3 {
463                    assert!(retrieved.is_none());
464                } else {
465                    assert_eq!(retrieved.expect("Data not found"), data);
466                }
467            }
468
469            // Check metrics
470            let buffer = context.encode();
471            assert!(buffer.contains("items_tracked 3"));
472            assert!(buffer.contains("indices_pruned_total 2"));
473            assert!(buffer.contains("pruned_total 0")); // no lazy cleanup yet
474
475            // Try to prune older section
476            archive.prune(2).await.expect("Failed to prune");
477
478            // Try to prune current section again
479            archive.prune(3).await.expect("Failed to prune");
480
481            // Try to put older index
482            let result = archive.put(1, test_key("key1-blah"), 1).await;
483            assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
484
485            // Trigger lazy removal of keys
486            archive
487                .put(6, test_key("key2-blfh"), 5)
488                .await
489                .expect("Failed to put data");
490
491            // Check metrics
492            let buffer = context.encode();
493            assert!(buffer.contains("items_tracked 4")); // lazily remove one, add one
494            assert!(buffer.contains("indices_pruned_total 2"));
495            assert!(buffer.contains("pruned_total 1"));
496        });
497    }
498
499    fn test_archive_keys_and_restart(num_keys: usize) -> String {
500        // Initialize the deterministic context
501        let executor = deterministic::Runner::default();
502        executor.start(|mut context| async move {
503            // Initialize the archive
504            let items_per_section = 256u64;
505            let cfg = Config {
506                translator: TwoCap,
507                key_partition: "test_index".into(),
508                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
509                value_partition: "test_value".into(),
510                codec_config: (),
511                compression: None,
512                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
513                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
514                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
515                items_per_section: NZU64!(items_per_section),
516            };
517            let mut archive = Archive::init(context.with_label("init1"), cfg.clone())
518                .await
519                .expect("Failed to initialize archive");
520
521            // Insert multiple keys across different sections
522            let mut keys = BTreeMap::new();
523            while keys.len() < num_keys {
524                let index = keys.len() as u64;
525                let mut key = [0u8; 64];
526                context.fill(&mut key);
527                let key = FixedBytes::<64>::decode(key.as_ref()).unwrap();
528                let mut data = [0u8; 1024];
529                context.fill(&mut data);
530                let data = FixedBytes::<1024>::decode(data.as_ref()).unwrap();
531
532                archive
533                    .put(index, key.clone(), data.clone())
534                    .await
535                    .expect("Failed to put data");
536                keys.insert(key, (index, data));
537            }
538
539            // Ensure all keys can be retrieved
540            for (key, (index, data)) in &keys {
541                let retrieved = archive
542                    .get(Identifier::Index(*index))
543                    .await
544                    .expect("Failed to get data")
545                    .expect("Data not found");
546                assert_eq!(&retrieved, data);
547                let retrieved = archive
548                    .get(Identifier::Key(key))
549                    .await
550                    .expect("Failed to get data")
551                    .expect("Data not found");
552                assert_eq!(&retrieved, data);
553            }
554
555            // Check metrics
556            let buffer = context.encode();
557            let tracked = format!("items_tracked {num_keys:?}");
558            assert!(buffer.contains(&tracked));
559            assert!(buffer.contains("pruned_total 0"));
560
561            // Sync and drop the archive
562            archive.sync().await.expect("Failed to sync archive");
563            drop(archive);
564
565            // Reinitialize the archive
566            let cfg = Config {
567                translator: TwoCap,
568                key_partition: "test_index".into(),
569                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
570                value_partition: "test_value".into(),
571                codec_config: (),
572                compression: None,
573                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
574                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
575                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
576                items_per_section: NZU64!(items_per_section),
577            };
578            let mut archive = Archive::<_, _, _, FixedBytes<1024>>::init(
579                context.with_label("init2"),
580                cfg.clone(),
581            )
582            .await
583            .expect("Failed to initialize archive");
584
585            // Ensure all keys can be retrieved
586            for (key, (index, data)) in &keys {
587                let retrieved = archive
588                    .get(Identifier::Index(*index))
589                    .await
590                    .expect("Failed to get data")
591                    .expect("Data not found");
592                assert_eq!(&retrieved, data);
593                let retrieved = archive
594                    .get(Identifier::Key(key))
595                    .await
596                    .expect("Failed to get data")
597                    .expect("Data not found");
598                assert_eq!(&retrieved, data);
599            }
600
601            // Prune first half
602            let min = (keys.len() / 2) as u64;
603            archive.prune(min).await.expect("Failed to prune");
604
605            // Ensure all keys can be retrieved that haven't been pruned
606            let min = (min / items_per_section) * items_per_section;
607            let mut removed = 0;
608            for (key, (index, data)) in keys {
609                if index >= min {
610                    let retrieved = archive
611                        .get(Identifier::Key(&key))
612                        .await
613                        .expect("Failed to get data")
614                        .expect("Data not found");
615                    assert_eq!(retrieved, data);
616
617                    // Check range
618                    let (current_end, start_next) = archive.next_gap(index);
619                    assert_eq!(current_end.unwrap(), num_keys as u64 - 1);
620                    assert!(start_next.is_none());
621                } else {
622                    let retrieved = archive
623                        .get(Identifier::Key(&key))
624                        .await
625                        .expect("Failed to get data");
626                    assert!(retrieved.is_none());
627                    removed += 1;
628
629                    // Check range
630                    let (current_end, start_next) = archive.next_gap(index);
631                    assert!(current_end.is_none());
632                    assert_eq!(start_next.unwrap(), min);
633                }
634            }
635
636            // Check metrics
637            let buffer = context.encode();
638            let tracked = format!("items_tracked {:?}", num_keys - removed);
639            assert!(buffer.contains(&tracked));
640            let pruned = format!("indices_pruned_total {removed}");
641            assert!(buffer.contains(&pruned));
642            assert!(buffer.contains("pruned_total 0")); // have not lazily removed keys yet
643
644            context.auditor().state()
645        })
646    }
647
648    #[test_group("slow")]
649    #[test_traced]
650    fn test_archive_many_keys_and_restart() {
651        test_archive_keys_and_restart(100_000);
652    }
653
654    #[test_group("slow")]
655    #[test_traced]
656    fn test_determinism() {
657        let state1 = test_archive_keys_and_restart(5_000);
658        let state2 = test_archive_keys_and_restart(5_000);
659        assert_eq!(state1, state2);
660    }
661}