commonware_storage/freezer/
mod.rs

1//! An immutable key-value store optimized for minimal memory usage and write amplification.
2//!
3//! [Freezer] is a key-value store designed for permanent storage where data is written once and never
4//! modified. Meant for resource-constrained environments, [Freezer] exclusively employs disk-resident
5//! data structures to serve queries and avoids ever rewriting (i.e. compacting) inserted data.
6//!
7//! As a byproduct of the mechanisms used to satisfy these constraints, [Freezer] consistently provides
8//! low latency access to recently added data (regardless of how much data has been stored) at the expense
9//! of a logarithmic increase in latency for old data (increasing with the number of items stored).
10//!
11//! # Format
12//!
13//! The [Freezer] uses a two-level architecture: an extendible hash table (written in a single [commonware_runtime::Blob])
14//! that maps keys to locations and a [crate::journal::segmented::variable::Journal] that stores key-value data.
15//!
16//! ```text
17//! +-----------------------------------------------------------------+
18//! |                           Hash Table                            |
19//! |  +---------+---------+---------+---------+---------+---------+  |
20//! |  | Entry 0 | Entry 1 | Entry 2 | Entry 3 | Entry 4 |   ...   |  |
21//! |  +----+----+----+----+----+----+----+----+----+----+---------+  |
22//! +-------|---------|---------|---------|---------|---------|-------+
23//!         |         |         |         |         |         |
24//!         v         v         v         v         v         v
25//! +-----------------------------------------------------------------+
26//! |                             Journal                             |
27//! |  Section 0: [Record 0][Record 1][Record 2]...                   |
28//! |  Section 1: [Record 10][Record 11][Record 12]...                |
29//! |  Section N: [Record 100][Record 101][Record 102]...             |
30//! +-----------------------------------------------------------------+
31//! ```
32//!
33//! The table uses two fixed-size slots per entry to ensure consistency during updates. Each slot
34//! contains an epoch number that monotonically increases with each sync operation. During reads,
35//! the slot with the higher epoch is selected (provided it's not greater than the last committed
36//! epoch), ensuring consistency even if the system crashed during a write.
37//!
38//! ```text
39//! +-------------------------------------+
40//! |          Hash Table Entry           |
41//! +-------------------------------------+
42//! |     Slot 0      |      Slot 1       |
43//! +-----------------+-------------------+
44//! | epoch:    u64   | epoch:    u64     |
45//! | section:  u64   | section:  u64     |
46//! | offset:   u32   | offset:   u32     |
47//! | added:    u8    | added:    u8      |
48//! +-----------------+-------------------+
49//! | CRC32:    u32   | CRC32:    u32     |
50//! +-----------------+-------------------+
51//! ```
52//!
53//! The journal stores variable-sized records, each containing a key-value pair and an optional pointer
54//! to the next record in the collision chain (for keys that hash to the same table index).
55//!
56//! ```text
57//! +-------------------------------------+
58//! |           Journal Record            |
59//! +-------------------------------------+
60//! | Key:   Array                        |
61//! | Value: Codec                        |
62//! | Next:  Option<(u64, u32)>           |
63//! +-------------------------------------+
64//! ```
65//!
66//! # Traversing Conflicts
67//!
68//! When multiple keys hash to the same table index, they form a linked list within the journal:
69//!
70//! ```text
71//! Hash Table:
72//! [Index 42]      +-------------------+
73//!                 | section: 2        |
74//!                 | offset: 768       |
75//!                 +---------+---------+
76//!                           |
77//! Journal:                  v
78//! [Section 2]     +-----+------------+-----+-----+-----+-----+-----+-----+
79//!                 | ... | Key: "foo" | ... | ... | ... | ... | ... | ... |
80//!                 |     | Value: 42  |     |     |     |     |     |     |
81//!                 |     | Next:(1,512)---+ |     |     |     |     |     |
82//!                 +-----+------------+---+-+-----+-----+-----+-----+-----+
83//!                                        |
84//!                                        v
85//! [Section 1]     +-----+-----+-----+------------+-----+-----+-----+-----+
86//!                 | ... | ... | ... | Key: "bar" | ... | ... | ... | ... |
87//!                 |     |     |     | Value: 84  |     |     |     |     |
88//!                 |     |     | +---| Next:(0,256)     |     |     |     |
89//!                 +-----+-----+-+---+------------+-----+-----+-----+-----+
90//!                               |
91//!                               v
92//! [Section 0]     +-----+------------+-----+-----+-----+-----+-----+-----+
93//!                 | ... | Key: "baz" | ... | ... | ... | ... | ... | ... |
94//!                 |     | Value: 126 |     |     |     |     |     |     |
95//!                 |     | Next: None |     |     |     |     |     |     |
96//!                 +-----+------------+-----+-----+-----+-----+-----+-----+
97//! ```
98//!
99//! New entries are prepended to the chain, becoming the new head. During lookup, the chain
100//! is traversed until a matching key is found. The `added` field in the table entry tracks
101//! insertions since the last resize, triggering table growth when 50% of entries have had
102//! `table_resize_frequency` items added (since the last resize).
103//!
104//! # Extendible Hashing
105//!
106//! The [Freezer] uses bit-based indexing to grow the on-disk hash table without rehashing existing entries:
107//!
108//! ```text
109//! Initial state (table_size=4, using 2 bits of hash):
110//! Hash: 0b...00 -> Index 0
111//! Hash: 0b...01 -> Index 1
112//! Hash: 0b...10 -> Index 2
113//! Hash: 0b...11 -> Index 3
114//!
115//! After resize (table_size=8, using 3 bits of hash):
116//! Hash: 0b...000 -> Index 0 -+
117//! ...                        |
118//! Hash: 0b...100 -> Index 4 -+- Both map to old Index 0
119//! Hash: 0b...001 -> Index 1 -+
120//! ...                        |
121//! Hash: 0b...101 -> Index 5 -+- Both map to old Index 1
122//! ```
123//!
124//! When the table doubles in size:
125//! 1. Each entry at index `i` splits into two entries: `i` and `i + old_size`
126//! 2. The existing chain head is copied to both locations with `added=0`
127//! 3. Future insertions will naturally distribute between the two entries based on their hash
128//!
129//! This approach ensures that entries inserted before a resize remain discoverable after the resize,
130//! as the lookup algorithm checks the appropriate entry based on the current table size. As more and more
131//! items are added (and resizes occur), the latency for fetching old data will increase logarithmically
132//! (with the number of items stored).
133//!
134//! To prevent a "stall" during a single resize, the table is resized incrementally across multiple sync calls.
135//! Each sync will process up to `table_resize_chunk_size` entries until the resize is complete. If there is
136//! an ongoing resize when closing the [Freezer], the resize will be completed before closing.
137//!
138//! # Example
139//!
140//! ```rust
141//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::PoolRef};
142//! use commonware_storage::freezer::{Freezer, Config, Identifier};
143//! use commonware_utils::{sequence::FixedBytes, NZUsize};
144//!
145//! let executor = deterministic::Runner::default();
146//! executor.start(|context| async move {
147//!     // Create a freezer
148//!     let cfg = Config {
149//!         journal_partition: "freezer_journal".into(),
150//!         journal_compression: Some(3),
151//!         journal_write_buffer: NZUsize!(1024 * 1024), // 1MB
152//!         journal_target_size: 100 * 1024 * 1024, // 100MB
153//!         journal_buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
154//!         table_partition: "freezer_table".into(),
155//!         table_initial_size: 65_536, // ~3MB initial table size
156//!         table_resize_frequency: 4, // Force resize once 4 writes to the same entry occur
157//!         table_resize_chunk_size: 16_384, // ~1MB of table entries rewritten per sync
158//!         table_replay_buffer: NZUsize!(1024 * 1024), // 1MB
159//!         codec_config: (),
160//!     };
161//!     let mut freezer = Freezer::<_, FixedBytes<32>, i32>::init(context, cfg).await.unwrap();
162//!
163//!     // Put a key-value pair
164//!     let key = FixedBytes::new([1u8; 32]);
165//!     freezer.put(key.clone(), 42).await.unwrap();
166//!
167//!     // Sync to disk
168//!     freezer.sync().await.unwrap();
169//!
170//!     // Get the value
171//!     let value = freezer.get(Identifier::Key(&key)).await.unwrap().unwrap();
172//!     assert_eq!(value, 42);
173//!
174//!     // Close the freezer
175//!     freezer.close().await.unwrap();
176//! });
177//! ```
178
179mod storage;
180use commonware_runtime::buffer::PoolRef;
181use commonware_utils::Array;
182use std::num::NonZeroUsize;
183pub use storage::{Checkpoint, Cursor, Freezer};
184use thiserror::Error;
185
186/// Subject of a [Freezer::get] operation.
187pub enum Identifier<'a, K: Array> {
188    Cursor(Cursor),
189    Key(&'a K),
190}
191
192/// Errors that can occur when interacting with the [Freezer].
193#[derive(Debug, Error)]
194pub enum Error {
195    #[error("runtime error: {0}")]
196    Runtime(#[from] commonware_runtime::Error),
197    #[error("journal error: {0}")]
198    Journal(#[from] crate::journal::Error),
199    #[error("codec error: {0}")]
200    Codec(#[from] commonware_codec::Error),
201}
202
203/// Configuration for [Freezer].
204#[derive(Clone)]
205pub struct Config<C> {
206    /// The [commonware_runtime::Storage] partition to use for storing the journal.
207    pub journal_partition: String,
208
209    /// The compression level to use for the [crate::journal::segmented::variable::Journal].
210    pub journal_compression: Option<u8>,
211
212    /// The size of the write buffer to use for the journal.
213    pub journal_write_buffer: NonZeroUsize,
214
215    /// The target size of each journal before creating a new one.
216    pub journal_target_size: u64,
217
218    /// The buffer pool to use for the journal.
219    pub journal_buffer_pool: PoolRef,
220
221    /// The [commonware_runtime::Storage] partition to use for storing the table.
222    pub table_partition: String,
223
224    /// The initial number of items in the table.
225    pub table_initial_size: u32,
226
227    /// The number of items that must be added to 50% of table entries since the last resize before
228    /// the table is resized again.
229    pub table_resize_frequency: u8,
230
231    /// The number of items to move during each resize operation (many may be required to complete a resize).
232    pub table_resize_chunk_size: u32,
233
234    /// The size of the read buffer to use when scanning the table (e.g., during recovery or resize).
235    pub table_replay_buffer: NonZeroUsize,
236
237    /// The codec configuration to use for the value stored in the freezer.
238    pub codec_config: C,
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use commonware_codec::DecodeExt;
245    use commonware_macros::test_traced;
246    use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
247    use commonware_utils::{hex, sequence::FixedBytes, NZUsize};
248    use rand::{Rng, RngCore};
249
250    const DEFAULT_JOURNAL_WRITE_BUFFER: usize = 1024;
251    const DEFAULT_JOURNAL_TARGET_SIZE: u64 = 10 * 1024 * 1024;
252    const DEFAULT_TABLE_INITIAL_SIZE: u32 = 256;
253    const DEFAULT_TABLE_RESIZE_FREQUENCY: u8 = 4;
254    const DEFAULT_TABLE_RESIZE_CHUNK_SIZE: u32 = 128; // force multiple chunks
255    const DEFAULT_TABLE_REPLAY_BUFFER: usize = 64 * 1024; // 64KB
256    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
257    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
258
259    fn test_key(key: &str) -> FixedBytes<64> {
260        let mut buf = [0u8; 64];
261        let key = key.as_bytes();
262        assert!(key.len() <= buf.len());
263        buf[..key.len()].copy_from_slice(key);
264        FixedBytes::decode(buf.as_ref()).unwrap()
265    }
266
267    fn test_put_get(compression: Option<u8>) {
268        // Initialize the deterministic context
269        let executor = deterministic::Runner::default();
270        executor.start(|context| async move {
271            // Initialize the freezer
272            let cfg = Config {
273                journal_partition: "test_journal".into(),
274                journal_compression: compression,
275                journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
276                journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
277                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
278                table_partition: "test_table".into(),
279                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
280                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
281                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
282                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
283                codec_config: (),
284            };
285            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
286                .await
287                .expect("Failed to initialize freezer");
288
289            let key = test_key("testkey");
290            let data = 42;
291
292            // Check key doesn't exist
293            let value = freezer
294                .get(Identifier::Key(&key))
295                .await
296                .expect("Failed to check key");
297            assert!(value.is_none());
298
299            // Put the key-data pair
300            freezer
301                .put(key.clone(), data)
302                .await
303                .expect("Failed to put data");
304
305            // Get the data back
306            let value = freezer
307                .get(Identifier::Key(&key))
308                .await
309                .expect("Failed to get data")
310                .expect("Data not found");
311            assert_eq!(value, data);
312
313            // Check metrics
314            let buffer = context.encode();
315            assert!(buffer.contains("gets_total 2"), "{}", buffer);
316            assert!(buffer.contains("puts_total 1"), "{}", buffer);
317            assert!(buffer.contains("unnecessary_reads_total 0"), "{}", buffer);
318
319            // Force a sync
320            freezer.sync().await.expect("Failed to sync data");
321        });
322    }
323
324    #[test_traced]
325    fn test_put_get_no_compression() {
326        test_put_get(None);
327    }
328
329    #[test_traced]
330    fn test_put_get_compression() {
331        test_put_get(Some(3));
332    }
333
334    #[test_traced]
335    fn test_multiple_keys() {
336        // Initialize the deterministic context
337        let executor = deterministic::Runner::default();
338        executor.start(|context| async move {
339            // Initialize the freezer
340            let cfg = Config {
341                journal_partition: "test_journal".into(),
342                journal_compression: None,
343                journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
344                journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
345                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
346                table_partition: "test_table".into(),
347                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
348                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
349                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
350                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
351                codec_config: (),
352            };
353            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
354                .await
355                .expect("Failed to initialize freezer");
356
357            // Insert multiple keys
358            let keys = vec![
359                (test_key("key1"), 1),
360                (test_key("key2"), 2),
361                (test_key("key3"), 3),
362                (test_key("key4"), 4),
363                (test_key("key5"), 5),
364            ];
365
366            for (key, data) in &keys {
367                freezer
368                    .put(key.clone(), *data)
369                    .await
370                    .expect("Failed to put data");
371            }
372
373            // Retrieve all keys and verify
374            for (key, data) in &keys {
375                let retrieved = freezer
376                    .get(Identifier::Key(key))
377                    .await
378                    .expect("Failed to get data")
379                    .expect("Data not found");
380                assert_eq!(retrieved, *data);
381            }
382        });
383    }
384
385    #[test_traced]
386    fn test_collision_handling() {
387        // Initialize the deterministic context
388        let executor = deterministic::Runner::default();
389        executor.start(|context| async move {
390            // Initialize the freezer with a very small table to force collisions
391            let cfg = Config {
392                journal_partition: "test_journal".into(),
393                journal_compression: None,
394                journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
395                journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
396                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
397                table_partition: "test_table".into(),
398                table_initial_size: 4, // Very small to force collisions
399                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
400                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
401                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
402                codec_config: (),
403            };
404            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
405                .await
406                .expect("Failed to initialize freezer");
407
408            // Insert multiple keys that will likely collide
409            let keys = vec![
410                (test_key("key1"), 1),
411                (test_key("key2"), 2),
412                (test_key("key3"), 3),
413                (test_key("key4"), 4),
414                (test_key("key5"), 5),
415                (test_key("key6"), 6),
416                (test_key("key7"), 7),
417                (test_key("key8"), 8),
418            ];
419
420            for (key, data) in &keys {
421                freezer
422                    .put(key.clone(), *data)
423                    .await
424                    .expect("Failed to put data");
425            }
426
427            // Sync to disk
428            freezer.sync().await.expect("Failed to sync");
429
430            // Retrieve all keys and verify they can still be found
431            for (key, data) in &keys {
432                let retrieved = freezer
433                    .get(Identifier::Key(key))
434                    .await
435                    .expect("Failed to get data")
436                    .expect("Data not found");
437                assert_eq!(retrieved, *data);
438            }
439
440            // Check metrics
441            let buffer = context.encode();
442            assert!(buffer.contains("gets_total 8"), "{}", buffer);
443            assert!(buffer.contains("unnecessary_reads_total 5"), "{}", buffer);
444        });
445    }
446
447    #[test_traced]
448    fn test_restart() {
449        // Initialize the deterministic context
450        let executor = deterministic::Runner::default();
451        executor.start(|context| async move {
452            let cfg = Config {
453                journal_partition: "test_journal".into(),
454                journal_compression: None,
455                journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
456                journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
457                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
458                table_partition: "test_table".into(),
459                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
460                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
461                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
462                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
463                codec_config: (),
464            };
465
466            // Insert data and close the freezer
467            let checkpoint = {
468                let mut freezer =
469                    Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
470                        .await
471                        .expect("Failed to initialize freezer");
472
473                let keys = vec![
474                    (test_key("persist1"), 100),
475                    (test_key("persist2"), 200),
476                    (test_key("persist3"), 300),
477                ];
478
479                for (key, data) in &keys {
480                    freezer
481                        .put(key.clone(), *data)
482                        .await
483                        .expect("Failed to put data");
484                }
485
486                freezer.close().await.expect("Failed to close freezer")
487            };
488
489            // Reopen and verify data persisted
490            {
491                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
492                    context.clone(),
493                    cfg.clone(),
494                    Some(checkpoint),
495                )
496                .await
497                .expect("Failed to initialize freezer");
498
499                let keys = vec![
500                    (test_key("persist1"), 100),
501                    (test_key("persist2"), 200),
502                    (test_key("persist3"), 300),
503                ];
504
505                for (key, data) in &keys {
506                    let retrieved = freezer
507                        .get(Identifier::Key(key))
508                        .await
509                        .expect("Failed to get data")
510                        .expect("Data not found");
511                    assert_eq!(retrieved, *data);
512                }
513            }
514        });
515    }
516
517    #[test_traced]
518    fn test_crash_consistency() {
519        // Initialize the deterministic context
520        let executor = deterministic::Runner::default();
521        executor.start(|context| async move {
522            let cfg = Config {
523                journal_partition: "test_journal".into(),
524                journal_compression: None,
525                journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
526                journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
527                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
528                table_partition: "test_table".into(),
529                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
530                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
531                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
532                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
533                codec_config: (),
534            };
535
536            // First, create some committed data and close the freezer
537            let checkpoint = {
538                let mut freezer =
539                    Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
540                        .await
541                        .expect("Failed to initialize freezer");
542
543                freezer
544                    .put(test_key("committed1"), 1)
545                    .await
546                    .expect("Failed to put data");
547                freezer
548                    .put(test_key("committed2"), 2)
549                    .await
550                    .expect("Failed to put data");
551
552                // Sync to ensure data is committed
553                freezer.sync().await.expect("Failed to sync");
554
555                // Add more data but don't sync (simulating crash)
556                freezer
557                    .put(test_key("uncommitted1"), 3)
558                    .await
559                    .expect("Failed to put data");
560                freezer
561                    .put(test_key("uncommitted2"), 4)
562                    .await
563                    .expect("Failed to put data");
564
565                // Close without syncing to simulate crash
566                freezer.close().await.expect("Failed to close")
567            };
568
569            // Reopen and verify only committed data is present
570            {
571                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
572                    context.clone(),
573                    cfg.clone(),
574                    Some(checkpoint),
575                )
576                .await
577                .expect("Failed to initialize freezer");
578
579                // Committed data should be present
580                assert_eq!(
581                    freezer
582                        .get(Identifier::Key(&test_key("committed1")))
583                        .await
584                        .unwrap(),
585                    Some(1)
586                );
587                assert_eq!(
588                    freezer
589                        .get(Identifier::Key(&test_key("committed2")))
590                        .await
591                        .unwrap(),
592                    Some(2)
593                );
594
595                // Uncommitted data might or might not be present depending on implementation
596                // But if present, it should be correct
597                if let Some(val) = freezer
598                    .get(Identifier::Key(&test_key("uncommitted1")))
599                    .await
600                    .unwrap()
601                {
602                    assert_eq!(val, 3);
603                }
604                if let Some(val) = freezer
605                    .get(Identifier::Key(&test_key("uncommitted2")))
606                    .await
607                    .unwrap()
608                {
609                    assert_eq!(val, 4);
610                }
611            }
612        });
613    }
614
615    #[test_traced]
616    fn test_destroy() {
617        // Initialize the deterministic context
618        let executor = deterministic::Runner::default();
619        executor.start(|context| async move {
620            // Initialize the freezer
621            let cfg = Config {
622                journal_partition: "test_journal".into(),
623                journal_compression: None,
624                journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
625                journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
626                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
627                table_partition: "test_table".into(),
628                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
629                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
630                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
631                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
632                codec_config: (),
633            };
634            {
635                let mut freezer =
636                    Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
637                        .await
638                        .expect("Failed to initialize freezer");
639
640                freezer
641                    .put(test_key("destroy1"), 1)
642                    .await
643                    .expect("Failed to put data");
644                freezer
645                    .put(test_key("destroy2"), 2)
646                    .await
647                    .expect("Failed to put data");
648
649                // Destroy the freezer
650                freezer.destroy().await.expect("Failed to destroy freezer");
651            }
652
653            // Try to create a new freezer - it should be empty
654            {
655                let freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
656                    .await
657                    .expect("Failed to initialize freezer");
658
659                // Should not find any data
660                assert!(freezer
661                    .get(Identifier::Key(&test_key("destroy1")))
662                    .await
663                    .unwrap()
664                    .is_none());
665                assert!(freezer
666                    .get(Identifier::Key(&test_key("destroy2")))
667                    .await
668                    .unwrap()
669                    .is_none());
670            }
671        });
672    }
673
674    #[test_traced]
675    fn test_partial_table_entry_write() {
676        // Initialize the deterministic context
677        let executor = deterministic::Runner::default();
678        executor.start(|context| async move {
679            // Initialize the freezer
680            let cfg = Config {
681                journal_partition: "test_journal".into(),
682                journal_compression: None,
683                journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
684                journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
685                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
686                table_partition: "test_table".into(),
687                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
688                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
689                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
690                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
691                codec_config: (),
692            };
693            let checkpoint = {
694                let mut freezer =
695                    Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
696                        .await
697                        .expect("Failed to initialize freezer");
698
699                freezer.put(test_key("key1"), 42).await.unwrap();
700                freezer.sync().await.unwrap();
701                freezer.close().await.unwrap()
702            };
703
704            // Corrupt the table by writing partial entry
705            {
706                let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
707                // Write incomplete table entry (only 10 bytes instead of 24)
708                blob.write_at(vec![0xFF; 10], 0).await.unwrap();
709                blob.sync().await.unwrap();
710            }
711
712            // Reopen and verify it handles the corruption
713            {
714                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
715                    context.clone(),
716                    cfg.clone(),
717                    Some(checkpoint),
718                )
719                .await
720                .expect("Failed to initialize freezer");
721
722                // The key should still be retrievable from journal if table is corrupted
723                // but the table entry is zeroed out
724                let result = freezer
725                    .get(Identifier::Key(&test_key("key1")))
726                    .await
727                    .unwrap();
728                assert!(result.is_none() || result == Some(42));
729            }
730        });
731    }
732
733    #[test_traced]
734    fn test_table_entry_invalid_crc() {
735        // Initialize the deterministic context
736        let executor = deterministic::Runner::default();
737        executor.start(|context| async move {
738            let cfg = Config {
739                journal_partition: "test_journal".into(),
740                journal_compression: None,
741                journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
742                journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
743                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
744                table_partition: "test_table".into(),
745                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
746                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
747                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
748                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
749                codec_config: (),
750            };
751
752            // Create freezer with data
753            let checkpoint = {
754                let mut freezer =
755                    Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
756                        .await
757                        .expect("Failed to initialize freezer");
758
759                freezer.put(test_key("key1"), 42).await.unwrap();
760                freezer.sync().await.unwrap();
761                freezer.close().await.unwrap()
762            };
763
764            // Corrupt the CRC in the index entry
765            {
766                let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
767                // Read the first entry
768                let entry_data = blob.read_at(vec![0u8; 24], 0).await.unwrap();
769                let mut corrupted = entry_data.as_ref().to_vec();
770                // Corrupt the CRC (last 4 bytes of the entry)
771                corrupted[20] ^= 0xFF;
772                blob.write_at(corrupted, 0).await.unwrap();
773                blob.sync().await.unwrap();
774            }
775
776            // Reopen and verify it handles invalid CRC
777            {
778                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
779                    context.clone(),
780                    cfg.clone(),
781                    Some(checkpoint),
782                )
783                .await
784                .expect("Failed to initialize freezer");
785
786                // With invalid CRC, the entry should be treated as invalid
787                let result = freezer
788                    .get(Identifier::Key(&test_key("key1")))
789                    .await
790                    .unwrap();
791                // The freezer should still work but may not find the key due to invalid table entry
792                assert!(result.is_none() || result == Some(42));
793            }
794        });
795    }
796
797    #[test_traced]
798    fn test_table_extra_bytes() {
799        // Initialize the deterministic context
800        let executor = deterministic::Runner::default();
801        executor.start(|context| async move {
802            let cfg = Config {
803                journal_partition: "test_journal".into(),
804                journal_compression: None,
805                journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
806                journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
807                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
808                table_partition: "test_table".into(),
809                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
810                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
811                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
812                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
813                codec_config: (),
814            };
815
816            // Create freezer with data
817            let checkpoint = {
818                let mut freezer =
819                    Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
820                        .await
821                        .expect("Failed to initialize freezer");
822
823                freezer.put(test_key("key1"), 42).await.unwrap();
824                freezer.sync().await.unwrap();
825                freezer.close().await.unwrap()
826            };
827
828            // Add extra bytes to the table blob
829            {
830                let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
831                // Append garbage data
832                blob.write_at(hex!("0xdeadbeef").to_vec(), size)
833                    .await
834                    .unwrap();
835                blob.sync().await.unwrap();
836            }
837
838            // Reopen and verify it handles extra bytes gracefully
839            {
840                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
841                    context.clone(),
842                    cfg.clone(),
843                    Some(checkpoint),
844                )
845                .await
846                .expect("Failed to initialize freezer");
847
848                // Should still be able to read the key
849                assert_eq!(
850                    freezer
851                        .get(Identifier::Key(&test_key("key1")))
852                        .await
853                        .unwrap(),
854                    Some(42)
855                );
856
857                // And write new data
858                let mut freezer_mut = freezer;
859                freezer_mut.put(test_key("key2"), 43).await.unwrap();
860                assert_eq!(
861                    freezer_mut
862                        .get(Identifier::Key(&test_key("key2")))
863                        .await
864                        .unwrap(),
865                    Some(43)
866                );
867            }
868        });
869    }
870
871    #[test_traced]
872    fn test_indexing_across_resizes() {
873        // Initialize the deterministic context
874        let executor = deterministic::Runner::default();
875        executor.start(|context| async move {
876            // Initialize the freezer
877            let cfg = Config {
878                journal_partition: "test_journal".into(),
879                journal_compression: None,
880                journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
881                journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
882                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
883                table_partition: "test_table".into(),
884                table_initial_size: 2, // Very small initial size to force multiple resizes
885                table_resize_frequency: 2, // Resize after 2 items per entry
886                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
887                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
888                codec_config: (),
889            };
890            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
891                .await
892                .expect("Failed to initialize freezer");
893
894            // Insert many keys to force multiple table resizes
895            // Table will grow from 2 -> 4 -> 8 -> 16 -> 32 -> 64 -> 128 -> 256 -> 512 -> 1024
896            let mut keys = Vec::new();
897            for i in 0..1000 {
898                let key = test_key(&format!("key{i}"));
899                keys.push((key.clone(), i));
900
901                // Force sync to ensure resize occurs ASAP
902                freezer.put(key, i).await.expect("Failed to put data");
903                freezer.sync().await.expect("Failed to sync");
904            }
905
906            // Verify all keys can still be found after multiple resizes
907            for (key, value) in &keys {
908                let retrieved = freezer
909                    .get(Identifier::Key(key))
910                    .await
911                    .expect("Failed to get data")
912                    .expect("Data not found");
913                assert_eq!(retrieved, *value, "Value mismatch for key after resizes");
914            }
915
916            // Close and reopen to verify persistence
917            let checkpoint = freezer.close().await.expect("Failed to close");
918            let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
919                context.clone(),
920                cfg.clone(),
921                Some(checkpoint),
922            )
923            .await
924            .expect("Failed to reinitialize freezer");
925
926            // Verify all keys can still be found after restart
927            for (key, value) in &keys {
928                let retrieved = freezer
929                    .get(Identifier::Key(key))
930                    .await
931                    .expect("Failed to get data")
932                    .expect("Data not found");
933                assert_eq!(retrieved, *value, "Value mismatch for key after restart");
934            }
935
936            // Verify metrics show resize operations occurred
937            let buffer = context.encode();
938            assert!(buffer.contains("resizes_total 8"), "{}", buffer);
939        });
940    }
941
942    #[test_traced]
943    fn test_insert_during_resize() {
944        let executor = deterministic::Runner::default();
945        executor.start(|context| async move {
946            let cfg = Config {
947                journal_partition: "test_journal".into(),
948                journal_compression: None,
949                journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
950                journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
951                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
952                table_partition: "test_table".into(),
953                table_initial_size: 2,
954                table_resize_frequency: 1,
955                table_resize_chunk_size: 1, // Process one at a time
956                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
957                codec_config: (),
958            };
959            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
960                .await
961                .unwrap();
962
963            // Insert keys to trigger resize
964            freezer.put(test_key("key0"), 0).await.unwrap();
965            freezer.put(test_key("key1"), 1).await.unwrap();
966            freezer.sync().await.unwrap(); // should start resize
967
968            // Verify resize started
969            assert!(freezer.resizing().is_some());
970
971            // Insert during resize (to first entry)
972            freezer.put(test_key("key2"), 2).await.unwrap();
973            assert!(context.encode().contains("unnecessary_writes_total 1"));
974            assert_eq!(freezer.resizable(), 3);
975
976            // Insert another key (to unmodified entry)
977            freezer.put(test_key("key3"), 3).await.unwrap();
978            assert!(context.encode().contains("unnecessary_writes_total 1"));
979            assert_eq!(freezer.resizable(), 3);
980
981            // Verify resize completed
982            freezer.sync().await.unwrap();
983            assert!(freezer.resizing().is_none());
984            assert_eq!(freezer.resizable(), 2);
985
986            // More inserts
987            freezer.put(test_key("key4"), 4).await.unwrap();
988            freezer.put(test_key("key5"), 5).await.unwrap();
989            freezer.sync().await.unwrap();
990
991            // Another resize should've started
992            assert!(freezer.resizing().is_some());
993
994            // Verify all can be retrieved during resize
995            for i in 0..6 {
996                let key = test_key(&format!("key{i}"));
997                assert_eq!(freezer.get(Identifier::Key(&key)).await.unwrap(), Some(i));
998            }
999
1000            // Sync until resize completes
1001            while freezer.resizing().is_some() {
1002                freezer.sync().await.unwrap();
1003            }
1004
1005            // Ensure no entries are considered resizable
1006            assert_eq!(freezer.resizable(), 0);
1007        });
1008    }
1009
1010    #[test_traced]
1011    fn test_resize_after_startup() {
1012        let executor = deterministic::Runner::default();
1013        executor.start(|context| async move {
1014            let cfg = Config {
1015                journal_partition: "test_journal".into(),
1016                journal_compression: None,
1017                journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
1018                journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
1019                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1020                table_partition: "test_table".into(),
1021                table_initial_size: 2,
1022                table_resize_frequency: 1,
1023                table_resize_chunk_size: 1, // Process one at a time
1024                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1025                codec_config: (),
1026            };
1027
1028            // Create freezer and then shutdown uncleanly
1029            let checkpoint = {
1030                let mut freezer =
1031                    Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
1032                        .await
1033                        .unwrap();
1034
1035                // Insert keys to trigger resize
1036                freezer.put(test_key("key0"), 0).await.unwrap();
1037                freezer.put(test_key("key1"), 1).await.unwrap();
1038                let checkpoint = freezer.sync().await.unwrap();
1039
1040                // Verify resize started
1041                assert!(freezer.resizing().is_some());
1042
1043                checkpoint
1044            };
1045
1046            // Reopen freezer
1047            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
1048                context.clone(),
1049                cfg.clone(),
1050                Some(checkpoint),
1051            )
1052            .await
1053            .unwrap();
1054            assert_eq!(freezer.resizable(), 1);
1055
1056            // Verify resize starts immediately (1 key will have 0 added but 1
1057            // will still have 1)
1058            freezer.sync().await.unwrap();
1059            assert!(freezer.resizing().is_some());
1060
1061            // Run until resize completes
1062            while freezer.resizing().is_some() {
1063                freezer.sync().await.unwrap();
1064            }
1065
1066            // Ensure no entries are considered resizable
1067            assert_eq!(freezer.resizable(), 0);
1068        });
1069    }
1070
1071    fn test_operations_and_restart(num_keys: usize) -> String {
1072        // Initialize the deterministic context
1073        let executor = deterministic::Runner::default();
1074        executor.start(|mut context| async move {
1075            // Initialize the freezer
1076            let cfg = Config {
1077                journal_partition: "test_journal".into(),
1078                journal_compression: None,
1079                journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
1080                journal_target_size: 128, // Force multiple journal sections
1081                journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1082                table_partition: "test_table".into(),
1083                table_initial_size: 8,     // Small table to force collisions
1084                table_resize_frequency: 2, // Force resize frequently
1085                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1086                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1087                codec_config: (),
1088            };
1089            let mut freezer =
1090                Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init(context.clone(), cfg.clone())
1091                    .await
1092                    .expect("Failed to initialize freezer");
1093
1094            // Generate and insert random key-value pairs
1095            let mut pairs = Vec::new();
1096
1097            for _ in 0..num_keys {
1098                // Generate random key
1099                let mut key = [0u8; 96];
1100                context.fill_bytes(&mut key);
1101                let key = FixedBytes::<96>::new(key);
1102
1103                // Generate random value
1104                let mut value = [0u8; 256];
1105                context.fill_bytes(&mut value);
1106                let value = FixedBytes::<256>::new(value);
1107
1108                // Store the key-value pair
1109                freezer
1110                    .put(key.clone(), value.clone())
1111                    .await
1112                    .expect("Failed to put data");
1113                pairs.push((key, value));
1114
1115                // Randomly sync to test resizing
1116                if context.gen_bool(0.1) {
1117                    freezer.sync().await.expect("Failed to sync");
1118                }
1119            }
1120
1121            // Sync data
1122            freezer.sync().await.expect("Failed to sync");
1123
1124            // Verify all pairs can be retrieved
1125            for (key, value) in &pairs {
1126                let retrieved = freezer
1127                    .get(Identifier::Key(key))
1128                    .await
1129                    .expect("Failed to get data")
1130                    .expect("Data not found");
1131                assert_eq!(&retrieved, value);
1132            }
1133
1134            // Test get() on all keys
1135            for (key, _) in &pairs {
1136                assert!(freezer
1137                    .get(Identifier::Key(key))
1138                    .await
1139                    .expect("Failed to check key")
1140                    .is_some());
1141            }
1142
1143            // Check some non-existent keys
1144            for _ in 0..10 {
1145                let mut key = [0u8; 96];
1146                context.fill_bytes(&mut key);
1147                let key = FixedBytes::<96>::new(key);
1148                assert!(freezer
1149                    .get(Identifier::Key(&key))
1150                    .await
1151                    .expect("Failed to check key")
1152                    .is_none());
1153            }
1154
1155            // Close the freezer
1156            let checkpoint = freezer.close().await.expect("Failed to close freezer");
1157
1158            // Reopen the freezer
1159            let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init_with_checkpoint(
1160                context.clone(),
1161                cfg.clone(),
1162                Some(checkpoint),
1163            )
1164            .await
1165            .expect("Failed to initialize freezer");
1166
1167            // Verify all pairs are still there after restart
1168            for (key, value) in &pairs {
1169                let retrieved = freezer
1170                    .get(Identifier::Key(key))
1171                    .await
1172                    .expect("Failed to get data")
1173                    .expect("Data not found");
1174                assert_eq!(&retrieved, value);
1175            }
1176
1177            // Add more pairs after restart to test collision handling
1178            for _ in 0..20 {
1179                let mut key = [0u8; 96];
1180                context.fill_bytes(&mut key);
1181                let key = FixedBytes::<96>::new(key);
1182
1183                let mut value = [0u8; 256];
1184                context.fill_bytes(&mut value);
1185                let value = FixedBytes::<256>::new(value);
1186
1187                freezer.put(key, value).await.expect("Failed to put data");
1188            }
1189
1190            // Multiple syncs to test epoch progression
1191            for _ in 0..3 {
1192                freezer.sync().await.expect("Failed to sync");
1193
1194                // Add a few more entries between syncs
1195                for _ in 0..5 {
1196                    let mut key = [0u8; 96];
1197                    context.fill_bytes(&mut key);
1198                    let key = FixedBytes::<96>::new(key);
1199
1200                    let mut value = [0u8; 256];
1201                    context.fill_bytes(&mut value);
1202                    let value = FixedBytes::<256>::new(value);
1203
1204                    freezer.put(key, value).await.expect("Failed to put data");
1205                }
1206            }
1207
1208            // Final sync
1209            freezer.sync().await.expect("Failed to sync");
1210
1211            // Return the auditor state for comparison
1212            context.auditor().state()
1213        })
1214    }
1215
1216    #[test_traced]
1217    #[ignore]
1218    fn test_determinism() {
1219        let state1 = test_operations_and_restart(1_000);
1220        let state2 = test_operations_and_restart(1_000);
1221        assert_eq!(state1, state2);
1222    }
1223}