commonware_storage/freezer/
mod.rs

1//! An immutable key-value store optimized for minimal memory usage and write amplification.
2//!
3//! [Freezer] is a key-value store designed for permanent storage where data is written once and never
4//! modified. Meant for resource-constrained environments, [Freezer] exclusively employs disk-resident
5//! data structures to serve queries and avoids ever rewriting (i.e. compacting) inserted data.
6//!
7//! As a byproduct of the mechanisms used to satisfy these constraints, [Freezer] consistently provides
8//! low latency access to recently added data (regardless of how much data has been stored) at the expense
9//! of a logarithmic increase in latency for old data (increasing with the number of items stored).
10//!
11//! # Format
12//!
13//! The [Freezer] uses a three-level architecture:
14//! 1. An extendible hash table (written in a single [commonware_runtime::Blob]) that maps keys to locations
15//! 2. A key index journal ([crate::journal::segmented::fixed]) that stores keys and collision chain pointers
16//! 3. A value journal ([crate::journal::segmented::glob]) that stores the actual values
17//!
18//! These journals are combined via [crate::journal::segmented::oversized], which coordinates
19//! crash recovery between them.
20//!
21//! ```text
22//! +-----------------------------------------------------------------+
23//! |                           Hash Table                            |
24//! |  +---------+---------+---------+---------+---------+---------+  |
25//! |  | Entry 0 | Entry 1 | Entry 2 | Entry 3 | Entry 4 |   ...   |  |
26//! |  +----+----+----+----+----+----+----+----+----+----+---------+  |
27//! +-------|---------|---------|---------|---------|---------|-------+
28//!         |         |         |         |         |         |
29//!         v         v         v         v         v         v
30//! +-----------------------------------------------------------------+
31//! |                      Key Index Journal                          |
32//! |  Section 0: [Entry 0][Entry 1][Entry 2]...                      |
33//! |  Section 1: [Entry 10][Entry 11][Entry 12]...                   |
34//! |  Section N: [Entry 100][Entry 101][Entry 102]...                |
35//! +-------|---------|---------|---------|---------|---------|-------+
36//!         |         |         |         |         |         |
37//!         v         v         v         v         v         v
38//! +-----------------------------------------------------------------+
39//! |                        Value Journal                            |
40//! |  Section 0: [Value 0][Value 1][Value 2]...                      |
41//! |  Section 1: [Value 10][Value 11][Value 12]...                   |
42//! |  Section N: [Value 100][Value 101][Value 102]...                |
43//! +-----------------------------------------------------------------+
44//! ```
45//!
46//! The table uses two fixed-size slots per entry to ensure consistency during updates. Each slot
47//! contains an epoch number that monotonically increases with each sync operation. During reads,
48//! the slot with the higher epoch is selected (provided it's not greater than the last committed
49//! epoch), ensuring consistency even if the system crashed during a write.
50//!
51//! ```text
52//! +-------------------------------------+
53//! |          Hash Table Entry           |
54//! +-------------------------------------+
55//! |     Slot 0      |      Slot 1       |
56//! +-----------------+-------------------+
57//! | epoch:    u64   | epoch:    u64     |
58//! | section:  u64   | section:  u64     |
59//! | offset:   u32   | offset:   u32     |
60//! | added:    u8    | added:    u8      |
61//! +-----------------+-------------------+
62//! | CRC32:    u32   | CRC32:    u32     |
63//! +-----------------+-------------------+
64//! ```
65//!
66//! The key index journal stores fixed-size entries containing a key, a pointer to the value in the
67//! value journal, and an optional pointer to the next entry in the collision chain (for keys that
68//! hash to the same table index).
69//!
70//! ```text
71//! +-------------------------------------+
72//! |        Key Index Entry              |
73//! +-------------------------------------+
74//! | Key:           Array                |
75//! | Value Offset:  u64                  |
76//! | Value Size:    u32                  |
77//! | Next:          Option<(u64, u32)>   |
78//! +-------------------------------------+
79//! ```
80//!
81//! The value journal stores the actual encoded values at the offsets referenced by the key index entries.
82//!
83//! # Traversing Conflicts
84//!
85//! When multiple keys hash to the same table index, they form a linked list within the key index
86//! journal. Each key index entry points to its value in the value journal:
87//!
88//! ```text
89//! Hash Table:
90//! [Index 42]         +-------------------+
91//!                    | section: 2        |
92//!                    | offset: 768       |
93//!                    +---------+---------+
94//!                              |
95//! Key Index Journal:           v
96//! [Section 2]        +-----------------------+
97//!                    | Key: "foo"            |
98//!                    | ValOff: 100           |
99//!                    | ValSize: 20           |
100//!                    | Next: (1, 512) -------+---+
101//!                    +-----------------------+   |
102//!                                                v
103//! [Section 1]        +-----------------------+
104//!                    | Key: "bar"            |
105//!                    | ValOff: 50            |
106//!                    | ValSize: 20           |
107//!                    | Next: (0, 256) -------+---+
108//!                    +-----------------------+   |
109//!                                                v
110//! [Section 0]        +-----------------------+
111//!                    | Key: "baz"            |
112//!                    | ValOff: 0             |
113//!                    | ValSize: 20           |
114//!                    | Next: None            |
115//!                    +-----------------------+
116//!
117//! Value Journal:
118//! [Section 0]        [Value: 126 @ offset 0 ]
119//! [Section 1]        [Value: 84  @ offset 50]
120//! [Section 2]        [Value: 42  @ offset 100]
121//! ```
122//!
123//! New entries are prepended to the chain, becoming the new head. During lookup, the chain
124//! is traversed until a matching key is found. The `added` field in the table entry tracks
125//! insertions since the last resize, triggering table growth when 50% of entries have had
126//! `table_resize_frequency` items added (since the last resize).
127//!
128//! # Extendible Hashing
129//!
130//! The [Freezer] uses bit-based indexing to grow the on-disk hash table without rehashing existing entries:
131//!
132//! ```text
133//! Initial state (table_size=4, using 2 bits of hash):
134//! Hash: 0b...00 -> Index 0
135//! Hash: 0b...01 -> Index 1
136//! Hash: 0b...10 -> Index 2
137//! Hash: 0b...11 -> Index 3
138//!
139//! After resize (table_size=8, using 3 bits of hash):
140//! Hash: 0b...000 -> Index 0 -+
141//! ...                        |
142//! Hash: 0b...100 -> Index 4 -+- Both map to old Index 0
143//! Hash: 0b...001 -> Index 1 -+
144//! ...                        |
145//! Hash: 0b...101 -> Index 5 -+- Both map to old Index 1
146//! ```
147//!
148//! When the table doubles in size:
149//! 1. Each entry at index `i` splits into two entries: `i` and `i + old_size`
150//! 2. The existing chain head is copied to both locations with `added=0`
151//! 3. Future insertions will naturally distribute between the two entries based on their hash
152//!
153//! This approach ensures that entries inserted before a resize remain discoverable after the resize,
154//! as the lookup algorithm checks the appropriate entry based on the current table size. As more and more
155//! items are added (and resizes occur), the latency for fetching old data will increase logarithmically
156//! (with the number of items stored).
157//!
158//! To prevent a "stall" during a single resize, the table is resized incrementally across multiple sync calls.
159//! Each sync will process up to `table_resize_chunk_size` entries until the resize is complete. If there is
160//! an ongoing resize when closing the [Freezer], the resize will be completed before closing.
161//!
162//! # Example
163//!
164//! ```rust
165//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::PoolRef};
166//! use commonware_storage::freezer::{Freezer, Config, Identifier};
167//! use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16};
168//!
169//! let executor = deterministic::Runner::default();
170//! executor.start(|context| async move {
171//!     // Create a freezer
172//!     let cfg = Config {
173//!         key_partition: "freezer_key_index".into(),
174//!         key_write_buffer: NZUsize!(1024 * 1024), // 1MB
175//!         key_buffer_pool: PoolRef::new(NZU16!(1024), NZUsize!(10)),
176//!         value_partition: "freezer_value_journal".into(),
177//!         value_compression: Some(3),
178//!         value_write_buffer: NZUsize!(1024 * 1024), // 1MB
179//!         value_target_size: 100 * 1024 * 1024, // 100MB
180//!         table_partition: "freezer_table".into(),
181//!         table_initial_size: 65_536, // ~3MB initial table size
182//!         table_resize_frequency: 4, // Force resize once 4 writes to the same entry occur
183//!         table_resize_chunk_size: 16_384, // ~1MB of table entries rewritten per sync
184//!         table_replay_buffer: NZUsize!(1024 * 1024), // 1MB
185//!         codec_config: (),
186//!     };
187//!     let mut freezer = Freezer::<_, FixedBytes<32>, i32>::init(context, cfg).await.unwrap();
188//!
189//!     // Put a key-value pair
190//!     let key = FixedBytes::new([1u8; 32]);
191//!     freezer.put(key.clone(), 42).await.unwrap();
192//!
193//!     // Sync to disk
194//!     freezer.sync().await.unwrap();
195//!
196//!     // Get the value
197//!     let value = freezer.get(Identifier::Key(&key)).await.unwrap().unwrap();
198//!     assert_eq!(value, 42);
199//!
200//!     // Close the freezer
201//!     freezer.close().await.unwrap();
202//! });
203//! ```
204
205mod storage;
206use commonware_runtime::buffer::PoolRef;
207use commonware_utils::Array;
208use std::num::NonZeroUsize;
209pub use storage::{Checkpoint, Cursor, Freezer};
210use thiserror::Error;
211
212/// Subject of a [Freezer::get] operation.
213pub enum Identifier<'a, K: Array> {
214    Cursor(Cursor),
215    Key(&'a K),
216}
217
218/// Errors that can occur when interacting with the [Freezer].
219#[derive(Debug, Error)]
220pub enum Error {
221    #[error("runtime error: {0}")]
222    Runtime(#[from] commonware_runtime::Error),
223    #[error("journal error: {0}")]
224    Journal(#[from] crate::journal::Error),
225    #[error("codec error: {0}")]
226    Codec(#[from] commonware_codec::Error),
227}
228
229/// Configuration for [Freezer].
230#[derive(Clone)]
231pub struct Config<C> {
232    /// The [commonware_runtime::Storage] partition for the key index journal.
233    pub key_partition: String,
234
235    /// The size of the write buffer for the key index journal.
236    pub key_write_buffer: NonZeroUsize,
237
238    /// The buffer pool for the key index journal.
239    pub key_buffer_pool: PoolRef,
240
241    /// The [commonware_runtime::Storage] partition for the value journal.
242    pub value_partition: String,
243
244    /// The compression level for the value journal.
245    pub value_compression: Option<u8>,
246
247    /// The size of the write buffer for the value journal.
248    pub value_write_buffer: NonZeroUsize,
249
250    /// The target size of each value journal section before creating a new one.
251    pub value_target_size: u64,
252
253    /// The [commonware_runtime::Storage] partition to use for storing the table.
254    pub table_partition: String,
255
256    /// The initial number of items in the table.
257    pub table_initial_size: u32,
258
259    /// The number of items that must be added to 50% of table entries since the last resize before
260    /// the table is resized again.
261    pub table_resize_frequency: u8,
262
263    /// The number of items to move during each resize operation (many may be required to complete a resize).
264    pub table_resize_chunk_size: u32,
265
266    /// The size of the read buffer to use when scanning the table (e.g., during recovery or resize).
267    pub table_replay_buffer: NonZeroUsize,
268
269    /// The codec configuration to use for the value stored in the freezer.
270    pub codec_config: C,
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use commonware_codec::DecodeExt;
277    use commonware_macros::{test_group, test_traced};
278    use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
279    use commonware_utils::{hex, sequence::FixedBytes, NZUsize, NZU16};
280    use rand::{Rng, RngCore};
281    use std::num::NonZeroU16;
282
283    const DEFAULT_WRITE_BUFFER: usize = 1024;
284    const DEFAULT_VALUE_TARGET_SIZE: u64 = 10 * 1024 * 1024;
285    const DEFAULT_TABLE_INITIAL_SIZE: u32 = 256;
286    const DEFAULT_TABLE_RESIZE_FREQUENCY: u8 = 4;
287    const DEFAULT_TABLE_RESIZE_CHUNK_SIZE: u32 = 128; // force multiple chunks
288    const DEFAULT_TABLE_REPLAY_BUFFER: usize = 64 * 1024; // 64KB
289    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
290    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
291
292    fn test_key(key: &str) -> FixedBytes<64> {
293        let mut buf = [0u8; 64];
294        let key = key.as_bytes();
295        assert!(key.len() <= buf.len());
296        buf[..key.len()].copy_from_slice(key);
297        FixedBytes::decode(buf.as_ref()).unwrap()
298    }
299
300    fn test_put_get(compression: Option<u8>) {
301        // Initialize the deterministic context
302        let executor = deterministic::Runner::default();
303        executor.start(|context| async move {
304            // Initialize the freezer
305            let cfg = Config {
306                key_partition: "test_key_index".into(),
307                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
308                key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
309                value_partition: "test_value_journal".into(),
310                value_compression: compression,
311                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
312                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
313                table_partition: "test_table".into(),
314                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
315                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
316                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
317                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
318                codec_config: (),
319            };
320            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
321                .await
322                .expect("Failed to initialize freezer");
323
324            let key = test_key("testkey");
325            let data = 42;
326
327            // Check key doesn't exist
328            let value = freezer
329                .get(Identifier::Key(&key))
330                .await
331                .expect("Failed to check key");
332            assert!(value.is_none());
333
334            // Put the key-data pair
335            freezer
336                .put(key.clone(), data)
337                .await
338                .expect("Failed to put data");
339
340            // Get the data back
341            let value = freezer
342                .get(Identifier::Key(&key))
343                .await
344                .expect("Failed to get data")
345                .expect("Data not found");
346            assert_eq!(value, data);
347
348            // Check metrics
349            let buffer = context.encode();
350            assert!(buffer.contains("gets_total 2"), "{}", buffer);
351            assert!(buffer.contains("puts_total 1"), "{}", buffer);
352            assert!(buffer.contains("unnecessary_reads_total 0"), "{}", buffer);
353
354            // Force a sync
355            freezer.sync().await.expect("Failed to sync data");
356        });
357    }
358
359    #[test_traced]
360    fn test_put_get_no_compression() {
361        test_put_get(None);
362    }
363
364    #[test_traced]
365    fn test_put_get_compression() {
366        test_put_get(Some(3));
367    }
368
369    #[test_traced]
370    fn test_multiple_keys() {
371        // Initialize the deterministic context
372        let executor = deterministic::Runner::default();
373        executor.start(|context| async move {
374            // Initialize the freezer
375            let cfg = Config {
376                key_partition: "test_key_index".into(),
377                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
378                key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
379                value_partition: "test_value_journal".into(),
380                value_compression: None,
381                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
382                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
383                table_partition: "test_table".into(),
384                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
385                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
386                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
387                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
388                codec_config: (),
389            };
390            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
391                .await
392                .expect("Failed to initialize freezer");
393
394            // Insert multiple keys
395            let keys = vec![
396                (test_key("key1"), 1),
397                (test_key("key2"), 2),
398                (test_key("key3"), 3),
399                (test_key("key4"), 4),
400                (test_key("key5"), 5),
401            ];
402
403            for (key, data) in &keys {
404                freezer
405                    .put(key.clone(), *data)
406                    .await
407                    .expect("Failed to put data");
408            }
409
410            // Retrieve all keys and verify
411            for (key, data) in &keys {
412                let retrieved = freezer
413                    .get(Identifier::Key(key))
414                    .await
415                    .expect("Failed to get data")
416                    .expect("Data not found");
417                assert_eq!(retrieved, *data);
418            }
419        });
420    }
421
422    #[test_traced]
423    fn test_collision_handling() {
424        // Initialize the deterministic context
425        let executor = deterministic::Runner::default();
426        executor.start(|context| async move {
427            // Initialize the freezer with a very small table to force collisions
428            let cfg = Config {
429                key_partition: "test_key_index".into(),
430                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
431                key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
432                value_partition: "test_value_journal".into(),
433                value_compression: None,
434                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
435                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
436                table_partition: "test_table".into(),
437                table_initial_size: 4, // Very small to force collisions
438                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
439                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
440                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
441                codec_config: (),
442            };
443            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
444                .await
445                .expect("Failed to initialize freezer");
446
447            // Insert multiple keys that will likely collide
448            let keys = vec![
449                (test_key("key1"), 1),
450                (test_key("key2"), 2),
451                (test_key("key3"), 3),
452                (test_key("key4"), 4),
453                (test_key("key5"), 5),
454                (test_key("key6"), 6),
455                (test_key("key7"), 7),
456                (test_key("key8"), 8),
457            ];
458
459            for (key, data) in &keys {
460                freezer
461                    .put(key.clone(), *data)
462                    .await
463                    .expect("Failed to put data");
464            }
465
466            // Sync to disk
467            freezer.sync().await.expect("Failed to sync");
468
469            // Retrieve all keys and verify they can still be found
470            for (key, data) in &keys {
471                let retrieved = freezer
472                    .get(Identifier::Key(key))
473                    .await
474                    .expect("Failed to get data")
475                    .expect("Data not found");
476                assert_eq!(retrieved, *data);
477            }
478
479            // Check metrics
480            let buffer = context.encode();
481            assert!(buffer.contains("gets_total 8"), "{}", buffer);
482            assert!(buffer.contains("unnecessary_reads_total 5"), "{}", buffer);
483        });
484    }
485
486    #[test_traced]
487    fn test_restart() {
488        // Initialize the deterministic context
489        let executor = deterministic::Runner::default();
490        executor.start(|context| async move {
491            let cfg = Config {
492                key_partition: "test_key_index".into(),
493                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
494                key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
495                value_partition: "test_value_journal".into(),
496                value_compression: None,
497                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
498                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
499                table_partition: "test_table".into(),
500                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
501                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
502                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
503                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
504                codec_config: (),
505            };
506
507            // Insert data and close the freezer
508            let checkpoint = {
509                let mut freezer =
510                    Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
511                        .await
512                        .expect("Failed to initialize freezer");
513
514                let keys = vec![
515                    (test_key("persist1"), 100),
516                    (test_key("persist2"), 200),
517                    (test_key("persist3"), 300),
518                ];
519
520                for (key, data) in &keys {
521                    freezer
522                        .put(key.clone(), *data)
523                        .await
524                        .expect("Failed to put data");
525                }
526
527                freezer.close().await.expect("Failed to close freezer")
528            };
529
530            // Reopen and verify data persisted
531            {
532                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
533                    context.clone(),
534                    cfg.clone(),
535                    Some(checkpoint),
536                )
537                .await
538                .expect("Failed to initialize freezer");
539
540                let keys = vec![
541                    (test_key("persist1"), 100),
542                    (test_key("persist2"), 200),
543                    (test_key("persist3"), 300),
544                ];
545
546                for (key, data) in &keys {
547                    let retrieved = freezer
548                        .get(Identifier::Key(key))
549                        .await
550                        .expect("Failed to get data")
551                        .expect("Data not found");
552                    assert_eq!(retrieved, *data);
553                }
554            }
555        });
556    }
557
558    #[test_traced]
559    fn test_crash_consistency() {
560        // Initialize the deterministic context
561        let executor = deterministic::Runner::default();
562        executor.start(|context| async move {
563            let cfg = Config {
564                key_partition: "test_key_index".into(),
565                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
566                key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
567                value_partition: "test_value_journal".into(),
568                value_compression: None,
569                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
570                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
571                table_partition: "test_table".into(),
572                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
573                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
574                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
575                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
576                codec_config: (),
577            };
578
579            // First, create some committed data and close the freezer
580            let checkpoint = {
581                let mut freezer =
582                    Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
583                        .await
584                        .expect("Failed to initialize freezer");
585
586                freezer
587                    .put(test_key("committed1"), 1)
588                    .await
589                    .expect("Failed to put data");
590                freezer
591                    .put(test_key("committed2"), 2)
592                    .await
593                    .expect("Failed to put data");
594
595                // Sync to ensure data is committed
596                freezer.sync().await.expect("Failed to sync");
597
598                // Add more data but don't sync (simulating crash)
599                freezer
600                    .put(test_key("uncommitted1"), 3)
601                    .await
602                    .expect("Failed to put data");
603                freezer
604                    .put(test_key("uncommitted2"), 4)
605                    .await
606                    .expect("Failed to put data");
607
608                // Close without syncing to simulate crash
609                freezer.close().await.expect("Failed to close")
610            };
611
612            // Reopen and verify only committed data is present
613            {
614                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
615                    context.clone(),
616                    cfg.clone(),
617                    Some(checkpoint),
618                )
619                .await
620                .expect("Failed to initialize freezer");
621
622                // Committed data should be present
623                assert_eq!(
624                    freezer
625                        .get(Identifier::Key(&test_key("committed1")))
626                        .await
627                        .unwrap(),
628                    Some(1)
629                );
630                assert_eq!(
631                    freezer
632                        .get(Identifier::Key(&test_key("committed2")))
633                        .await
634                        .unwrap(),
635                    Some(2)
636                );
637
638                // Uncommitted data might or might not be present depending on implementation
639                // But if present, it should be correct
640                if let Some(val) = freezer
641                    .get(Identifier::Key(&test_key("uncommitted1")))
642                    .await
643                    .unwrap()
644                {
645                    assert_eq!(val, 3);
646                }
647                if let Some(val) = freezer
648                    .get(Identifier::Key(&test_key("uncommitted2")))
649                    .await
650                    .unwrap()
651                {
652                    assert_eq!(val, 4);
653                }
654            }
655        });
656    }
657
658    #[test_traced]
659    fn test_destroy() {
660        // Initialize the deterministic context
661        let executor = deterministic::Runner::default();
662        executor.start(|context| async move {
663            // Initialize the freezer
664            let cfg = Config {
665                key_partition: "test_key_index".into(),
666                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
667                key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
668                value_partition: "test_value_journal".into(),
669                value_compression: None,
670                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
671                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
672                table_partition: "test_table".into(),
673                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
674                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
675                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
676                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
677                codec_config: (),
678            };
679            {
680                let mut freezer =
681                    Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
682                        .await
683                        .expect("Failed to initialize freezer");
684
685                freezer
686                    .put(test_key("destroy1"), 1)
687                    .await
688                    .expect("Failed to put data");
689                freezer
690                    .put(test_key("destroy2"), 2)
691                    .await
692                    .expect("Failed to put data");
693
694                // Destroy the freezer
695                freezer.destroy().await.expect("Failed to destroy freezer");
696            }
697
698            // Try to create a new freezer - it should be empty
699            {
700                let freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
701                    .await
702                    .expect("Failed to initialize freezer");
703
704                // Should not find any data
705                assert!(freezer
706                    .get(Identifier::Key(&test_key("destroy1")))
707                    .await
708                    .unwrap()
709                    .is_none());
710                assert!(freezer
711                    .get(Identifier::Key(&test_key("destroy2")))
712                    .await
713                    .unwrap()
714                    .is_none());
715            }
716        });
717    }
718
719    #[test_traced]
720    fn test_partial_table_entry_write() {
721        // Initialize the deterministic context
722        let executor = deterministic::Runner::default();
723        executor.start(|context| async move {
724            // Initialize the freezer
725            let cfg = Config {
726                key_partition: "test_key_index".into(),
727                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
728                key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
729                value_partition: "test_value_journal".into(),
730                value_compression: None,
731                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
732                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
733                table_partition: "test_table".into(),
734                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
735                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
736                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
737                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
738                codec_config: (),
739            };
740            let checkpoint = {
741                let mut freezer =
742                    Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
743                        .await
744                        .expect("Failed to initialize freezer");
745
746                freezer.put(test_key("key1"), 42).await.unwrap();
747                freezer.sync().await.unwrap();
748                freezer.close().await.unwrap()
749            };
750
751            // Corrupt the table by writing partial entry
752            {
753                let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
754                // Write incomplete table entry (only 10 bytes instead of 24)
755                blob.write_at(vec![0xFF; 10], 0).await.unwrap();
756                blob.sync().await.unwrap();
757            }
758
759            // Reopen and verify it handles the corruption
760            {
761                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
762                    context.clone(),
763                    cfg.clone(),
764                    Some(checkpoint),
765                )
766                .await
767                .expect("Failed to initialize freezer");
768
769                // The key should still be retrievable from journal if table is corrupted
770                // but the table entry is zeroed out
771                let result = freezer
772                    .get(Identifier::Key(&test_key("key1")))
773                    .await
774                    .unwrap();
775                assert!(result.is_none() || result == Some(42));
776            }
777        });
778    }
779
780    #[test_traced]
781    fn test_table_entry_invalid_crc() {
782        // Initialize the deterministic context
783        let executor = deterministic::Runner::default();
784        executor.start(|context| async move {
785            let cfg = Config {
786                key_partition: "test_key_index".into(),
787                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
788                key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
789                value_partition: "test_value_journal".into(),
790                value_compression: None,
791                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
792                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
793                table_partition: "test_table".into(),
794                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
795                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
796                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
797                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
798                codec_config: (),
799            };
800
801            // Create freezer with data
802            let checkpoint = {
803                let mut freezer =
804                    Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
805                        .await
806                        .expect("Failed to initialize freezer");
807
808                freezer.put(test_key("key1"), 42).await.unwrap();
809                freezer.sync().await.unwrap();
810                freezer.close().await.unwrap()
811            };
812
813            // Corrupt the CRC in the index entry
814            {
815                let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
816                // Read the first entry
817                let entry_data = blob.read_at(vec![0u8; 24], 0).await.unwrap();
818                let mut corrupted = entry_data.as_ref().to_vec();
819                // Corrupt the CRC (last 4 bytes of the entry)
820                corrupted[20] ^= 0xFF;
821                blob.write_at(corrupted, 0).await.unwrap();
822                blob.sync().await.unwrap();
823            }
824
825            // Reopen and verify it handles invalid CRC
826            {
827                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
828                    context.clone(),
829                    cfg.clone(),
830                    Some(checkpoint),
831                )
832                .await
833                .expect("Failed to initialize freezer");
834
835                // With invalid CRC, the entry should be treated as invalid
836                let result = freezer
837                    .get(Identifier::Key(&test_key("key1")))
838                    .await
839                    .unwrap();
840                // The freezer should still work but may not find the key due to invalid table entry
841                assert!(result.is_none() || result == Some(42));
842            }
843        });
844    }
845
846    #[test_traced]
847    fn test_table_extra_bytes() {
848        // Initialize the deterministic context
849        let executor = deterministic::Runner::default();
850        executor.start(|context| async move {
851            let cfg = Config {
852                key_partition: "test_key_index".into(),
853                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
854                key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
855                value_partition: "test_value_journal".into(),
856                value_compression: None,
857                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
858                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
859                table_partition: "test_table".into(),
860                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
861                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
862                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
863                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
864                codec_config: (),
865            };
866
867            // Create freezer with data
868            let checkpoint = {
869                let mut freezer =
870                    Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
871                        .await
872                        .expect("Failed to initialize freezer");
873
874                freezer.put(test_key("key1"), 42).await.unwrap();
875                freezer.sync().await.unwrap();
876                freezer.close().await.unwrap()
877            };
878
879            // Add extra bytes to the table blob
880            {
881                let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
882                // Append garbage data
883                blob.write_at(hex!("0xdeadbeef").to_vec(), size)
884                    .await
885                    .unwrap();
886                blob.sync().await.unwrap();
887            }
888
889            // Reopen and verify it handles extra bytes gracefully
890            {
891                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
892                    context.clone(),
893                    cfg.clone(),
894                    Some(checkpoint),
895                )
896                .await
897                .expect("Failed to initialize freezer");
898
899                // Should still be able to read the key
900                assert_eq!(
901                    freezer
902                        .get(Identifier::Key(&test_key("key1")))
903                        .await
904                        .unwrap(),
905                    Some(42)
906                );
907
908                // And write new data
909                let mut freezer_mut = freezer;
910                freezer_mut.put(test_key("key2"), 43).await.unwrap();
911                assert_eq!(
912                    freezer_mut
913                        .get(Identifier::Key(&test_key("key2")))
914                        .await
915                        .unwrap(),
916                    Some(43)
917                );
918            }
919        });
920    }
921
922    #[test_traced]
923    fn test_indexing_across_resizes() {
924        // Initialize the deterministic context
925        let executor = deterministic::Runner::default();
926        executor.start(|context| async move {
927            // Initialize the freezer
928            let cfg = Config {
929                key_partition: "test_key_index".into(),
930                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
931                key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
932                value_partition: "test_value_journal".into(),
933                value_compression: None,
934                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
935                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
936                table_partition: "test_table".into(),
937                table_initial_size: 2, // Very small initial size to force multiple resizes
938                table_resize_frequency: 2, // Resize after 2 items per entry
939                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
940                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
941                codec_config: (),
942            };
943            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
944                .await
945                .expect("Failed to initialize freezer");
946
947            // Insert many keys to force multiple table resizes
948            // Table will grow from 2 -> 4 -> 8 -> 16 -> 32 -> 64 -> 128 -> 256 -> 512 -> 1024
949            let mut keys = Vec::new();
950            for i in 0..1000 {
951                let key = test_key(&format!("key{i}"));
952                keys.push((key.clone(), i));
953
954                // Force sync to ensure resize occurs ASAP
955                freezer.put(key, i).await.expect("Failed to put data");
956                freezer.sync().await.expect("Failed to sync");
957            }
958
959            // Verify all keys can still be found after multiple resizes
960            for (key, value) in &keys {
961                let retrieved = freezer
962                    .get(Identifier::Key(key))
963                    .await
964                    .expect("Failed to get data")
965                    .expect("Data not found");
966                assert_eq!(retrieved, *value, "Value mismatch for key after resizes");
967            }
968
969            // Close and reopen to verify persistence
970            let checkpoint = freezer.close().await.expect("Failed to close");
971            let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
972                context.clone(),
973                cfg.clone(),
974                Some(checkpoint),
975            )
976            .await
977            .expect("Failed to reinitialize freezer");
978
979            // Verify all keys can still be found after restart
980            for (key, value) in &keys {
981                let retrieved = freezer
982                    .get(Identifier::Key(key))
983                    .await
984                    .expect("Failed to get data")
985                    .expect("Data not found");
986                assert_eq!(retrieved, *value, "Value mismatch for key after restart");
987            }
988
989            // Verify metrics show resize operations occurred
990            let buffer = context.encode();
991            assert!(buffer.contains("resizes_total 8"), "{}", buffer);
992        });
993    }
994
995    #[test_traced]
996    fn test_insert_during_resize() {
997        let executor = deterministic::Runner::default();
998        executor.start(|context| async move {
999            let cfg = Config {
1000                key_partition: "test_key_index".into(),
1001                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1002                key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1003                value_partition: "test_value_journal".into(),
1004                value_compression: None,
1005                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1006                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1007                table_partition: "test_table".into(),
1008                table_initial_size: 2,
1009                table_resize_frequency: 1,
1010                table_resize_chunk_size: 1, // Process one at a time
1011                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1012                codec_config: (),
1013            };
1014            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
1015                .await
1016                .unwrap();
1017
1018            // Insert keys to trigger resize
1019            // key0 -> entry 0, key2 -> entry 1
1020            freezer.put(test_key("key0"), 0).await.unwrap();
1021            freezer.put(test_key("key2"), 1).await.unwrap();
1022            freezer.sync().await.unwrap(); // should start resize
1023
1024            // Verify resize started
1025            assert!(freezer.resizing().is_some());
1026
1027            // Insert during resize (to first entry)
1028            // key6 -> entry 0
1029            freezer.put(test_key("key6"), 2).await.unwrap();
1030            assert!(context.encode().contains("unnecessary_writes_total 1"));
1031            assert_eq!(freezer.resizable(), 3);
1032
1033            // Insert another key (to unmodified entry)
1034            // key3 -> entry 1
1035            freezer.put(test_key("key3"), 3).await.unwrap();
1036            assert!(context.encode().contains("unnecessary_writes_total 1"));
1037            assert_eq!(freezer.resizable(), 3);
1038
1039            // Verify resize completed
1040            freezer.sync().await.unwrap();
1041            assert!(freezer.resizing().is_none());
1042            assert_eq!(freezer.resizable(), 2);
1043
1044            // More inserts
1045            // key4 -> entry 1, key7 -> entry 0
1046            freezer.put(test_key("key4"), 4).await.unwrap();
1047            freezer.put(test_key("key7"), 5).await.unwrap();
1048            freezer.sync().await.unwrap();
1049
1050            // Another resize should've started
1051            assert!(freezer.resizing().is_some());
1052
1053            // Verify all can be retrieved during resize
1054            let keys = ["key0", "key2", "key6", "key3", "key4", "key7"];
1055            for (i, k) in keys.iter().enumerate() {
1056                assert_eq!(
1057                    freezer.get(Identifier::Key(&test_key(k))).await.unwrap(),
1058                    Some(i as i32)
1059                );
1060            }
1061
1062            // Sync until resize completes
1063            while freezer.resizing().is_some() {
1064                freezer.sync().await.unwrap();
1065            }
1066
1067            // Ensure no entries are considered resizable
1068            assert_eq!(freezer.resizable(), 0);
1069        });
1070    }
1071
1072    #[test_traced]
1073    fn test_resize_after_startup() {
1074        let executor = deterministic::Runner::default();
1075        executor.start(|context| async move {
1076            let cfg = Config {
1077                key_partition: "test_key_index".into(),
1078                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1079                key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1080                value_partition: "test_value_journal".into(),
1081                value_compression: None,
1082                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1083                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1084                table_partition: "test_table".into(),
1085                table_initial_size: 2,
1086                table_resize_frequency: 1,
1087                table_resize_chunk_size: 1, // Process one at a time
1088                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1089                codec_config: (),
1090            };
1091
1092            // Create freezer and then shutdown uncleanly
1093            let checkpoint = {
1094                let mut freezer =
1095                    Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
1096                        .await
1097                        .unwrap();
1098
1099                // Insert keys to trigger resize
1100                // key0 -> entry 0, key2 -> entry 1
1101                freezer.put(test_key("key0"), 0).await.unwrap();
1102                freezer.put(test_key("key2"), 1).await.unwrap();
1103                let checkpoint = freezer.sync().await.unwrap();
1104
1105                // Verify resize started
1106                assert!(freezer.resizing().is_some());
1107
1108                checkpoint
1109            };
1110
1111            // Reopen freezer
1112            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
1113                context.clone(),
1114                cfg.clone(),
1115                Some(checkpoint),
1116            )
1117            .await
1118            .unwrap();
1119            assert_eq!(freezer.resizable(), 1);
1120
1121            // Verify resize starts immediately (1 key will have 0 added but 1
1122            // will still have 1)
1123            freezer.sync().await.unwrap();
1124            assert!(freezer.resizing().is_some());
1125
1126            // Run until resize completes
1127            while freezer.resizing().is_some() {
1128                freezer.sync().await.unwrap();
1129            }
1130
1131            // Ensure no entries are considered resizable
1132            assert_eq!(freezer.resizable(), 0);
1133        });
1134    }
1135
1136    fn test_operations_and_restart(num_keys: usize) -> String {
1137        // Initialize the deterministic context
1138        let executor = deterministic::Runner::default();
1139        executor.start(|mut context| async move {
1140            // Initialize the freezer
1141            let cfg = Config {
1142                key_partition: "test_key_index".into(),
1143                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1144                key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1145                value_partition: "test_value_journal".into(),
1146                value_compression: None,
1147                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1148                value_target_size: 128, // Force multiple journal sections
1149                table_partition: "test_table".into(),
1150                table_initial_size: 8,     // Small table to force collisions
1151                table_resize_frequency: 2, // Force resize frequently
1152                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1153                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1154                codec_config: (),
1155            };
1156            let mut freezer =
1157                Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init(context.clone(), cfg.clone())
1158                    .await
1159                    .expect("Failed to initialize freezer");
1160
1161            // Generate and insert random key-value pairs
1162            let mut pairs = Vec::new();
1163
1164            for _ in 0..num_keys {
1165                // Generate random key
1166                let mut key = [0u8; 96];
1167                context.fill_bytes(&mut key);
1168                let key = FixedBytes::<96>::new(key);
1169
1170                // Generate random value
1171                let mut value = [0u8; 256];
1172                context.fill_bytes(&mut value);
1173                let value = FixedBytes::<256>::new(value);
1174
1175                // Store the key-value pair
1176                freezer
1177                    .put(key.clone(), value.clone())
1178                    .await
1179                    .expect("Failed to put data");
1180                pairs.push((key, value));
1181
1182                // Randomly sync to test resizing
1183                if context.gen_bool(0.1) {
1184                    freezer.sync().await.expect("Failed to sync");
1185                }
1186            }
1187
1188            // Sync data
1189            freezer.sync().await.expect("Failed to sync");
1190
1191            // Verify all pairs can be retrieved
1192            for (key, value) in &pairs {
1193                let retrieved = freezer
1194                    .get(Identifier::Key(key))
1195                    .await
1196                    .expect("Failed to get data")
1197                    .expect("Data not found");
1198                assert_eq!(&retrieved, value);
1199            }
1200
1201            // Test get() on all keys
1202            for (key, _) in &pairs {
1203                assert!(freezer
1204                    .get(Identifier::Key(key))
1205                    .await
1206                    .expect("Failed to check key")
1207                    .is_some());
1208            }
1209
1210            // Check some non-existent keys
1211            for _ in 0..10 {
1212                let mut key = [0u8; 96];
1213                context.fill_bytes(&mut key);
1214                let key = FixedBytes::<96>::new(key);
1215                assert!(freezer
1216                    .get(Identifier::Key(&key))
1217                    .await
1218                    .expect("Failed to check key")
1219                    .is_none());
1220            }
1221
1222            // Close the freezer
1223            let checkpoint = freezer.close().await.expect("Failed to close freezer");
1224
1225            // Reopen the freezer
1226            let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init_with_checkpoint(
1227                context.clone(),
1228                cfg.clone(),
1229                Some(checkpoint),
1230            )
1231            .await
1232            .expect("Failed to initialize freezer");
1233
1234            // Verify all pairs are still there after restart
1235            for (key, value) in &pairs {
1236                let retrieved = freezer
1237                    .get(Identifier::Key(key))
1238                    .await
1239                    .expect("Failed to get data")
1240                    .expect("Data not found");
1241                assert_eq!(&retrieved, value);
1242            }
1243
1244            // Add more pairs after restart to test collision handling
1245            for _ in 0..20 {
1246                let mut key = [0u8; 96];
1247                context.fill_bytes(&mut key);
1248                let key = FixedBytes::<96>::new(key);
1249
1250                let mut value = [0u8; 256];
1251                context.fill_bytes(&mut value);
1252                let value = FixedBytes::<256>::new(value);
1253
1254                freezer.put(key, value).await.expect("Failed to put data");
1255            }
1256
1257            // Multiple syncs to test epoch progression
1258            for _ in 0..3 {
1259                freezer.sync().await.expect("Failed to sync");
1260
1261                // Add a few more entries between syncs
1262                for _ in 0..5 {
1263                    let mut key = [0u8; 96];
1264                    context.fill_bytes(&mut key);
1265                    let key = FixedBytes::<96>::new(key);
1266
1267                    let mut value = [0u8; 256];
1268                    context.fill_bytes(&mut value);
1269                    let value = FixedBytes::<256>::new(value);
1270
1271                    freezer.put(key, value).await.expect("Failed to put data");
1272                }
1273            }
1274
1275            // Final sync
1276            freezer.sync().await.expect("Failed to sync");
1277
1278            // Return the auditor state for comparison
1279            context.auditor().state()
1280        })
1281    }
1282
1283    #[test_group("slow")]
1284    #[test_traced]
1285    fn test_determinism() {
1286        let state1 = test_operations_and_restart(1_000);
1287        let state2 = test_operations_and_restart(1_000);
1288        assert_eq!(state1, state2);
1289    }
1290
1291    #[test_traced]
1292    fn test_put_multiple_updates() {
1293        // Initialize the deterministic context
1294        let executor = deterministic::Runner::default();
1295        executor.start(|context| async move {
1296            // Initialize the freezer
1297            let cfg = Config {
1298                key_partition: "test_key_index".into(),
1299                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1300                key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1301                value_partition: "test_value_journal".into(),
1302                value_compression: None,
1303                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1304                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1305                table_partition: "test_table".into(),
1306                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
1307                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
1308                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1309                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1310                codec_config: (),
1311            };
1312            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
1313                .await
1314                .expect("Failed to initialize freezer");
1315
1316            let key = test_key("key1");
1317
1318            freezer
1319                .put(key.clone(), 1)
1320                .await
1321                .expect("Failed to put data");
1322            freezer
1323                .put(key.clone(), 2)
1324                .await
1325                .expect("Failed to put data");
1326            freezer.sync().await.expect("Failed to sync");
1327            assert_eq!(
1328                freezer
1329                    .get(Identifier::Key(&key))
1330                    .await
1331                    .expect("Failed to get data")
1332                    .unwrap(),
1333                2
1334            );
1335        });
1336    }
1337}