Skip to main content

commonware_storage/freezer/
mod.rs

1//! An immutable key-value store optimized for minimal memory usage and write amplification.
2//!
3//! [Freezer] is a key-value store designed for permanent storage where data is written once and never
4//! modified. Meant for resource-constrained environments, [Freezer] exclusively employs disk-resident
5//! data structures to serve queries and avoids ever rewriting (i.e. compacting) inserted data.
6//!
7//! As a byproduct of the mechanisms used to satisfy these constraints, [Freezer] consistently provides
8//! low latency access to recently added data (regardless of how much data has been stored) at the expense
9//! of a logarithmic increase in latency for old data (increasing with the number of items stored).
10//!
11//! # Format
12//!
13//! The [Freezer] uses a three-level architecture:
14//! 1. An extendible hash table (written in a single [commonware_runtime::Blob]) that maps keys to locations
15//! 2. A key index journal ([crate::journal::segmented::fixed]) that stores keys and collision chain pointers
16//! 3. A value journal ([crate::journal::segmented::glob]) that stores the actual values
17//!
18//! These journals are combined via [crate::journal::segmented::oversized], which coordinates
19//! crash recovery between them.
20//!
21//! ```text
22//! +-----------------------------------------------------------------+
23//! |                           Hash Table                            |
24//! |  +---------+---------+---------+---------+---------+---------+  |
25//! |  | Entry 0 | Entry 1 | Entry 2 | Entry 3 | Entry 4 |   ...   |  |
26//! |  +----+----+----+----+----+----+----+----+----+----+---------+  |
27//! +-------|---------|---------|---------|---------|---------|-------+
28//!         |         |         |         |         |         |
29//!         v         v         v         v         v         v
30//! +-----------------------------------------------------------------+
31//! |                      Key Index Journal                          |
32//! |  Section 0: [Entry 0][Entry 1][Entry 2]...                      |
33//! |  Section 1: [Entry 10][Entry 11][Entry 12]...                   |
34//! |  Section N: [Entry 100][Entry 101][Entry 102]...                |
35//! +-------|---------|---------|---------|---------|---------|-------+
36//!         |         |         |         |         |         |
37//!         v         v         v         v         v         v
38//! +-----------------------------------------------------------------+
39//! |                        Value Journal                            |
40//! |  Section 0: [Value 0][Value 1][Value 2]...                      |
41//! |  Section 1: [Value 10][Value 11][Value 12]...                   |
42//! |  Section N: [Value 100][Value 101][Value 102]...                |
43//! +-----------------------------------------------------------------+
44//! ```
45//!
46//! The table uses two fixed-size slots per entry to ensure consistency during updates. Each slot
47//! contains an epoch number that monotonically increases with each sync operation. During reads,
48//! the slot with the higher epoch is selected (provided it's not greater than the last committed
49//! epoch), ensuring consistency even if the system crashed during a write.
50//!
51//! ```text
52//! +-------------------------------------+
53//! |          Hash Table Entry           |
54//! +-------------------------------------+
55//! |     Slot 0      |      Slot 1       |
56//! +-----------------+-------------------+
57//! | epoch:    u64   | epoch:    u64     |
58//! | section:  u64   | section:  u64     |
59//! | offset:   u32   | offset:   u32     |
60//! | added:    u8    | added:    u8      |
61//! +-----------------+-------------------+
62//! | CRC32:    u32   | CRC32:    u32     |
63//! +-----------------+-------------------+
64//! ```
65//!
66//! The key index journal stores fixed-size entries containing a key, a pointer to the value in the
67//! value journal, and an optional pointer to the next entry in the collision chain (for keys that
68//! hash to the same table index).
69//!
70//! ```text
71//! +-------------------------------------+
72//! |        Key Index Entry              |
73//! +-------------------------------------+
74//! | Key:           Array                |
75//! | Value Offset:  u64                  |
76//! | Value Size:    u32                  |
77//! | Next:          Option<(u64, u32)>   |
78//! +-------------------------------------+
79//! ```
80//!
81//! The value journal stores the actual encoded values at the offsets referenced by the key index entries.
82//!
83//! # Traversing Conflicts
84//!
85//! When multiple keys hash to the same table index, they form a linked list within the key index
86//! journal. Each key index entry points to its value in the value journal:
87//!
88//! ```text
89//! Hash Table:
90//! [Index 42]         +-------------------+
91//!                    | section: 2        |
92//!                    | offset: 768       |
93//!                    +---------+---------+
94//!                              |
95//! Key Index Journal:           v
96//! [Section 2]        +-----------------------+
97//!                    | Key: "foo"            |
98//!                    | ValOff: 100           |
99//!                    | ValSize: 20           |
100//!                    | Next: (1, 512) -------+---+
101//!                    +-----------------------+   |
102//!                                                v
103//! [Section 1]        +-----------------------+
104//!                    | Key: "bar"            |
105//!                    | ValOff: 50            |
106//!                    | ValSize: 20           |
107//!                    | Next: (0, 256) -------+---+
108//!                    +-----------------------+   |
109//!                                                v
110//! [Section 0]        +-----------------------+
111//!                    | Key: "baz"            |
112//!                    | ValOff: 0             |
113//!                    | ValSize: 20           |
114//!                    | Next: None            |
115//!                    +-----------------------+
116//!
117//! Value Journal:
118//! [Section 0]        [Value: 126 @ offset 0 ]
119//! [Section 1]        [Value: 84  @ offset 50]
120//! [Section 2]        [Value: 42  @ offset 100]
121//! ```
122//!
123//! New entries are prepended to the chain, becoming the new head. During lookup, the chain
124//! is traversed until a matching key is found. The `added` field in the table entry tracks
125//! insertions since the last resize, triggering table growth when 50% of entries have had
126//! `table_resize_frequency` items added (since the last resize).
127//!
128//! # Extendible Hashing
129//!
130//! The [Freezer] uses bit-based indexing to grow the on-disk hash table without rehashing existing entries:
131//!
132//! ```text
133//! Initial state (table_size=4, using 2 bits of hash):
134//! Hash: 0b...00 -> Index 0
135//! Hash: 0b...01 -> Index 1
136//! Hash: 0b...10 -> Index 2
137//! Hash: 0b...11 -> Index 3
138//!
139//! After resize (table_size=8, using 3 bits of hash):
140//! Hash: 0b...000 -> Index 0 -+
141//! ...                        |
142//! Hash: 0b...100 -> Index 4 -+- Both map to old Index 0
143//! Hash: 0b...001 -> Index 1 -+
144//! ...                        |
145//! Hash: 0b...101 -> Index 5 -+- Both map to old Index 1
146//! ```
147//!
148//! When the table doubles in size:
149//! 1. Each entry at index `i` splits into two entries: `i` and `i + old_size`
150//! 2. The existing chain head is copied to both locations with `added=0`
151//! 3. Future insertions will naturally distribute between the two entries based on their hash
152//!
153//! This approach ensures that entries inserted before a resize remain discoverable after the resize,
154//! as the lookup algorithm checks the appropriate entry based on the current table size. As more and more
155//! items are added (and resizes occur), the latency for fetching old data will increase logarithmically
156//! (with the number of items stored).
157//!
158//! To prevent a "stall" during a single resize, the table is resized incrementally across multiple sync calls.
159//! Each sync will process up to `table_resize_chunk_size` entries until the resize is complete. If there is
160//! an ongoing resize when closing the [Freezer], the resize will be completed before closing.
161//!
162//! # Example
163//!
164//! ```rust
165//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
166//! use commonware_storage::freezer::{Freezer, Config, Identifier};
167//! use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16};
168//!
169//! let executor = deterministic::Runner::default();
170//! executor.start(|context| async move {
171//!     // Create a freezer
172//!     let cfg = Config {
173//!         key_partition: "freezer-key-index".into(),
174//!         key_write_buffer: NZUsize!(1024 * 1024), // 1MB
175//!         key_page_cache: CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10)),
176//!         value_partition: "freezer-value-journal".into(),
177//!         value_compression: Some(3),
178//!         value_write_buffer: NZUsize!(1024 * 1024), // 1MB
179//!         value_target_size: 100 * 1024 * 1024, // 100MB
180//!         table_partition: "freezer-table".into(),
181//!         table_initial_size: 65_536, // ~3MB initial table size
182//!         table_resize_frequency: 4, // Force resize once 4 writes to the same entry occur
183//!         table_resize_chunk_size: 16_384, // ~1MB of table entries rewritten per sync
184//!         table_replay_buffer: NZUsize!(1024 * 1024), // 1MB
185//!         codec_config: (),
186//!     };
187//!     let mut freezer = Freezer::<_, FixedBytes<32>, i32>::init(context, cfg).await.unwrap();
188//!
189//!     // Put a key-value pair
190//!     let key = FixedBytes::new([1u8; 32]);
191//!     freezer.put(key.clone(), 42).await.unwrap();
192//!
193//!     // Sync to disk
194//!     freezer.sync().await.unwrap();
195//!
196//!     // Get the value
197//!     let value = freezer.get(Identifier::Key(&key)).await.unwrap().unwrap();
198//!     assert_eq!(value, 42);
199//!
200//!     // Close the freezer
201//!     freezer.close().await.unwrap();
202//! });
203//! ```
204
205#[cfg(test)]
206mod conformance;
207mod storage;
208use commonware_runtime::buffer::paged::CacheRef;
209use commonware_utils::Array;
210use std::num::NonZeroUsize;
211pub use storage::{Checkpoint, Cursor, Freezer};
212use thiserror::Error;
213
214/// Subject of a [Freezer::get] operation.
215pub enum Identifier<'a, K: Array> {
216    Cursor(Cursor),
217    Key(&'a K),
218}
219
220/// Errors that can occur when interacting with the [Freezer].
221#[derive(Debug, Error)]
222pub enum Error {
223    #[error("runtime error: {0}")]
224    Runtime(#[from] commonware_runtime::Error),
225    #[error("journal error: {0}")]
226    Journal(#[from] crate::journal::Error),
227    #[error("codec error: {0}")]
228    Codec(#[from] commonware_codec::Error),
229}
230
231/// Configuration for [Freezer].
232#[derive(Clone)]
233pub struct Config<C> {
234    /// The [commonware_runtime::Storage] partition for the key index journal.
235    pub key_partition: String,
236
237    /// The size of the write buffer for the key index journal.
238    pub key_write_buffer: NonZeroUsize,
239
240    /// The page cache for the key index journal.
241    pub key_page_cache: CacheRef,
242
243    /// The [commonware_runtime::Storage] partition for the value journal.
244    pub value_partition: String,
245
246    /// The compression level for the value journal.
247    pub value_compression: Option<u8>,
248
249    /// The size of the write buffer for the value journal.
250    pub value_write_buffer: NonZeroUsize,
251
252    /// The target size of each value journal section before creating a new one.
253    pub value_target_size: u64,
254
255    /// The [commonware_runtime::Storage] partition to use for storing the table.
256    pub table_partition: String,
257
258    /// The initial number of items in the table.
259    pub table_initial_size: u32,
260
261    /// The number of items that must be added to 50% of table entries since the last resize before
262    /// the table is resized again.
263    pub table_resize_frequency: u8,
264
265    /// The number of items to move during each resize operation (many may be required to complete a resize).
266    pub table_resize_chunk_size: u32,
267
268    /// The size of the read buffer to use when scanning the table (e.g., during recovery or resize).
269    pub table_replay_buffer: NonZeroUsize,
270
271    /// The codec configuration to use for the value stored in the freezer.
272    pub codec_config: C,
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use commonware_codec::DecodeExt;
279    use commonware_macros::{test_group, test_traced};
280    use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
281    use commonware_utils::{hex, sequence::FixedBytes, NZUsize, NZU16};
282    use rand::{Rng, RngCore};
283    use std::num::NonZeroU16;
284
285    fn test_key(key: &str) -> FixedBytes<64> {
286        let mut buf = [0u8; 64];
287        let key = key.as_bytes();
288        assert!(key.len() <= buf.len());
289        buf[..key.len()].copy_from_slice(key);
290        FixedBytes::decode(buf.as_ref()).unwrap()
291    }
292
293    const DEFAULT_WRITE_BUFFER: usize = 1024;
294    const DEFAULT_VALUE_TARGET_SIZE: u64 = 10 * 1024 * 1024;
295    const DEFAULT_TABLE_INITIAL_SIZE: u32 = 256;
296    const DEFAULT_TABLE_RESIZE_FREQUENCY: u8 = 4;
297    const DEFAULT_TABLE_RESIZE_CHUNK_SIZE: u32 = 128; // force multiple chunks
298    const DEFAULT_TABLE_REPLAY_BUFFER: usize = 64 * 1024; // 64KB
299    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
300    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
301
302    fn test_put_get(compression: Option<u8>) {
303        // Initialize the deterministic context
304        let executor = deterministic::Runner::default();
305        executor.start(|context| async move {
306            // Initialize the freezer
307            let cfg = Config {
308                key_partition: "test-key-index".into(),
309                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
310                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
311                value_partition: "test-value-journal".into(),
312                value_compression: compression,
313                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
314                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
315                table_partition: "test-table".into(),
316                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
317                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
318                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
319                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
320                codec_config: (),
321            };
322            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
323                .await
324                .expect("Failed to initialize freezer");
325
326            let key = test_key("testkey");
327            let data = 42;
328
329            // Check key doesn't exist
330            let value = freezer
331                .get(Identifier::Key(&key))
332                .await
333                .expect("Failed to check key");
334            assert!(value.is_none());
335
336            // Put the key-data pair
337            freezer
338                .put(key.clone(), data)
339                .await
340                .expect("Failed to put data");
341
342            // Get the data back
343            let value = freezer
344                .get(Identifier::Key(&key))
345                .await
346                .expect("Failed to get data")
347                .expect("Data not found");
348            assert_eq!(value, data);
349
350            // Check metrics
351            let buffer = context.encode();
352            assert!(buffer.contains("gets_total 2"), "{}", buffer);
353            assert!(buffer.contains("puts_total 1"), "{}", buffer);
354            assert!(buffer.contains("unnecessary_reads_total 0"), "{}", buffer);
355
356            // Force a sync
357            freezer.sync().await.expect("Failed to sync data");
358        });
359    }
360
361    #[test_traced]
362    fn test_put_get_no_compression() {
363        test_put_get(None);
364    }
365
366    #[test_traced]
367    fn test_put_get_compression() {
368        test_put_get(Some(3));
369    }
370
371    #[test_traced]
372    fn test_multiple_keys() {
373        // Initialize the deterministic context
374        let executor = deterministic::Runner::default();
375        executor.start(|context| async move {
376            // Initialize the freezer
377            let cfg = Config {
378                key_partition: "test-key-index".into(),
379                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
380                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
381                value_partition: "test-value-journal".into(),
382                value_compression: None,
383                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
384                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
385                table_partition: "test-table".into(),
386                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
387                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
388                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
389                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
390                codec_config: (),
391            };
392            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
393                .await
394                .expect("Failed to initialize freezer");
395
396            // Insert multiple keys
397            let keys = vec![
398                (test_key("key1"), 1),
399                (test_key("key2"), 2),
400                (test_key("key3"), 3),
401                (test_key("key4"), 4),
402                (test_key("key5"), 5),
403            ];
404
405            for (key, data) in &keys {
406                freezer
407                    .put(key.clone(), *data)
408                    .await
409                    .expect("Failed to put data");
410            }
411
412            // Retrieve all keys and verify
413            for (key, data) in &keys {
414                let retrieved = freezer
415                    .get(Identifier::Key(key))
416                    .await
417                    .expect("Failed to get data")
418                    .expect("Data not found");
419                assert_eq!(retrieved, *data);
420            }
421        });
422    }
423
424    #[test_traced]
425    fn test_collision_handling() {
426        // Initialize the deterministic context
427        let executor = deterministic::Runner::default();
428        executor.start(|context| async move {
429            // Initialize the freezer with a very small table to force collisions
430            let cfg = Config {
431                key_partition: "test-key-index".into(),
432                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
433                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
434                value_partition: "test-value-journal".into(),
435                value_compression: None,
436                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
437                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
438                table_partition: "test-table".into(),
439                table_initial_size: 4, // Very small to force collisions
440                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
441                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
442                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
443                codec_config: (),
444            };
445            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
446                .await
447                .expect("Failed to initialize freezer");
448
449            // Insert multiple keys that will likely collide
450            let keys = vec![
451                (test_key("key1"), 1),
452                (test_key("key2"), 2),
453                (test_key("key3"), 3),
454                (test_key("key4"), 4),
455                (test_key("key5"), 5),
456                (test_key("key6"), 6),
457                (test_key("key7"), 7),
458                (test_key("key8"), 8),
459            ];
460
461            for (key, data) in &keys {
462                freezer
463                    .put(key.clone(), *data)
464                    .await
465                    .expect("Failed to put data");
466            }
467
468            // Sync to disk
469            freezer.sync().await.expect("Failed to sync");
470
471            // Retrieve all keys and verify they can still be found
472            for (key, data) in &keys {
473                let retrieved = freezer
474                    .get(Identifier::Key(key))
475                    .await
476                    .expect("Failed to get data")
477                    .expect("Data not found");
478                assert_eq!(retrieved, *data);
479            }
480
481            // Check metrics
482            let buffer = context.encode();
483            assert!(buffer.contains("gets_total 8"), "{}", buffer);
484            assert!(buffer.contains("unnecessary_reads_total 5"), "{}", buffer);
485        });
486    }
487
488    #[test_traced]
489    fn test_restart() {
490        // Initialize the deterministic context
491        let executor = deterministic::Runner::default();
492        executor.start(|context| async move {
493            let cfg = Config {
494                key_partition: "test-key-index".into(),
495                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
496                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
497                value_partition: "test-value-journal".into(),
498                value_compression: None,
499                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
500                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
501                table_partition: "test-table".into(),
502                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
503                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
504                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
505                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
506                codec_config: (),
507            };
508
509            // Insert data and close the freezer
510            let checkpoint = {
511                let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
512                    context.with_label("first"),
513                    cfg.clone(),
514                )
515                .await
516                .expect("Failed to initialize freezer");
517
518                let keys = vec![
519                    (test_key("persist1"), 100),
520                    (test_key("persist2"), 200),
521                    (test_key("persist3"), 300),
522                ];
523
524                for (key, data) in &keys {
525                    freezer
526                        .put(key.clone(), *data)
527                        .await
528                        .expect("Failed to put data");
529                }
530
531                freezer.close().await.expect("Failed to close freezer")
532            };
533
534            // Reopen and verify data persisted
535            {
536                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
537                    context.with_label("second"),
538                    cfg.clone(),
539                    Some(checkpoint),
540                )
541                .await
542                .expect("Failed to initialize freezer");
543
544                let keys = vec![
545                    (test_key("persist1"), 100),
546                    (test_key("persist2"), 200),
547                    (test_key("persist3"), 300),
548                ];
549
550                for (key, data) in &keys {
551                    let retrieved = freezer
552                        .get(Identifier::Key(key))
553                        .await
554                        .expect("Failed to get data")
555                        .expect("Data not found");
556                    assert_eq!(retrieved, *data);
557                }
558            }
559        });
560    }
561
562    #[test_traced]
563    fn test_crash_consistency() {
564        // Initialize the deterministic context
565        let executor = deterministic::Runner::default();
566        executor.start(|context| async move {
567            let cfg = Config {
568                key_partition: "test-key-index".into(),
569                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
570                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
571                value_partition: "test-value-journal".into(),
572                value_compression: None,
573                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
574                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
575                table_partition: "test-table".into(),
576                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
577                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
578                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
579                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
580                codec_config: (),
581            };
582
583            // First, create some committed data and close the freezer
584            let checkpoint = {
585                let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
586                    context.with_label("first"),
587                    cfg.clone(),
588                )
589                .await
590                .expect("Failed to initialize freezer");
591
592                freezer
593                    .put(test_key("committed1"), 1)
594                    .await
595                    .expect("Failed to put data");
596                freezer
597                    .put(test_key("committed2"), 2)
598                    .await
599                    .expect("Failed to put data");
600
601                // Sync to ensure data is committed
602                freezer.sync().await.expect("Failed to sync");
603
604                // Add more data but don't sync (simulating crash)
605                freezer
606                    .put(test_key("uncommitted1"), 3)
607                    .await
608                    .expect("Failed to put data");
609                freezer
610                    .put(test_key("uncommitted2"), 4)
611                    .await
612                    .expect("Failed to put data");
613
614                // Close without syncing to simulate crash
615                freezer.close().await.expect("Failed to close")
616            };
617
618            // Reopen and verify only committed data is present
619            {
620                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
621                    context.with_label("second"),
622                    cfg.clone(),
623                    Some(checkpoint),
624                )
625                .await
626                .expect("Failed to initialize freezer");
627
628                // Committed data should be present
629                assert_eq!(
630                    freezer
631                        .get(Identifier::Key(&test_key("committed1")))
632                        .await
633                        .unwrap(),
634                    Some(1)
635                );
636                assert_eq!(
637                    freezer
638                        .get(Identifier::Key(&test_key("committed2")))
639                        .await
640                        .unwrap(),
641                    Some(2)
642                );
643
644                // Uncommitted data might or might not be present depending on implementation
645                // But if present, it should be correct
646                if let Some(val) = freezer
647                    .get(Identifier::Key(&test_key("uncommitted1")))
648                    .await
649                    .unwrap()
650                {
651                    assert_eq!(val, 3);
652                }
653                if let Some(val) = freezer
654                    .get(Identifier::Key(&test_key("uncommitted2")))
655                    .await
656                    .unwrap()
657                {
658                    assert_eq!(val, 4);
659                }
660            }
661        });
662    }
663
664    #[test_traced]
665    fn test_destroy() {
666        // Initialize the deterministic context
667        let executor = deterministic::Runner::default();
668        executor.start(|context| async move {
669            // Initialize the freezer
670            let cfg = Config {
671                key_partition: "test-key-index".into(),
672                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
673                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
674                value_partition: "test-value-journal".into(),
675                value_compression: None,
676                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
677                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
678                table_partition: "test-table".into(),
679                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
680                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
681                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
682                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
683                codec_config: (),
684            };
685            {
686                let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
687                    context.with_label("first"),
688                    cfg.clone(),
689                )
690                .await
691                .expect("Failed to initialize freezer");
692
693                freezer
694                    .put(test_key("destroy1"), 1)
695                    .await
696                    .expect("Failed to put data");
697                freezer
698                    .put(test_key("destroy2"), 2)
699                    .await
700                    .expect("Failed to put data");
701
702                // Destroy the freezer
703                freezer.destroy().await.expect("Failed to destroy freezer");
704            }
705
706            // Try to create a new freezer - it should be empty
707            {
708                let freezer = Freezer::<_, FixedBytes<64>, i32>::init(
709                    context.with_label("second"),
710                    cfg.clone(),
711                )
712                .await
713                .expect("Failed to initialize freezer");
714
715                // Should not find any data
716                assert!(freezer
717                    .get(Identifier::Key(&test_key("destroy1")))
718                    .await
719                    .unwrap()
720                    .is_none());
721                assert!(freezer
722                    .get(Identifier::Key(&test_key("destroy2")))
723                    .await
724                    .unwrap()
725                    .is_none());
726            }
727        });
728    }
729
730    #[test_traced]
731    fn test_partial_table_entry_write() {
732        // Initialize the deterministic context
733        let executor = deterministic::Runner::default();
734        executor.start(|context| async move {
735            // Initialize the freezer
736            let cfg = Config {
737                key_partition: "test-key-index".into(),
738                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
739                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
740                value_partition: "test-value-journal".into(),
741                value_compression: None,
742                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
743                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
744                table_partition: "test-table".into(),
745                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
746                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
747                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
748                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
749                codec_config: (),
750            };
751            let checkpoint = {
752                let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
753                    context.with_label("first"),
754                    cfg.clone(),
755                )
756                .await
757                .expect("Failed to initialize freezer");
758
759                freezer.put(test_key("key1"), 42).await.unwrap();
760                freezer.sync().await.unwrap();
761                freezer.close().await.unwrap()
762            };
763
764            // Corrupt the table by writing partial entry
765            {
766                let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
767                // Write incomplete table entry (only 10 bytes instead of 24)
768                blob.write_at(0, vec![0xFF; 10]).await.unwrap();
769                blob.sync().await.unwrap();
770            }
771
772            // Reopen and verify it handles the corruption
773            {
774                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
775                    context.with_label("second"),
776                    cfg.clone(),
777                    Some(checkpoint),
778                )
779                .await
780                .expect("Failed to initialize freezer");
781
782                // The key should still be retrievable from journal if table is corrupted
783                // but the table entry is zeroed out
784                let result = freezer
785                    .get(Identifier::Key(&test_key("key1")))
786                    .await
787                    .unwrap();
788                assert!(result.is_none() || result == Some(42));
789            }
790        });
791    }
792
793    #[test_traced]
794    fn test_table_entry_invalid_crc() {
795        // Initialize the deterministic context
796        let executor = deterministic::Runner::default();
797        executor.start(|context| async move {
798            let cfg = Config {
799                key_partition: "test-key-index".into(),
800                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
801                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
802                value_partition: "test-value-journal".into(),
803                value_compression: None,
804                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
805                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
806                table_partition: "test-table".into(),
807                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
808                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
809                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
810                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
811                codec_config: (),
812            };
813
814            // Create freezer with data
815            let checkpoint = {
816                let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
817                    context.with_label("first"),
818                    cfg.clone(),
819                )
820                .await
821                .expect("Failed to initialize freezer");
822
823                freezer.put(test_key("key1"), 42).await.unwrap();
824                freezer.sync().await.unwrap();
825                freezer.close().await.unwrap()
826            };
827
828            // Corrupt the CRC in the index entry
829            {
830                let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
831                // Read the first entry
832                let entry_data = blob.read_at(0, 24).await.unwrap();
833                let mut corrupted = entry_data.coalesce();
834                // Corrupt the CRC (last 4 bytes of the entry)
835                corrupted.as_mut()[20] ^= 0xFF;
836                blob.write_at(0, corrupted).await.unwrap();
837                blob.sync().await.unwrap();
838            }
839
840            // Reopen and verify it handles invalid CRC
841            {
842                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
843                    context.with_label("second"),
844                    cfg.clone(),
845                    Some(checkpoint),
846                )
847                .await
848                .expect("Failed to initialize freezer");
849
850                // With invalid CRC, the entry should be treated as invalid
851                let result = freezer
852                    .get(Identifier::Key(&test_key("key1")))
853                    .await
854                    .unwrap();
855                // The freezer should still work but may not find the key due to invalid table entry
856                assert!(result.is_none() || result == Some(42));
857            }
858        });
859    }
860
861    #[test_traced]
862    fn test_table_extra_bytes() {
863        // Initialize the deterministic context
864        let executor = deterministic::Runner::default();
865        executor.start(|context| async move {
866            let cfg = Config {
867                key_partition: "test-key-index".into(),
868                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
869                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
870                value_partition: "test-value-journal".into(),
871                value_compression: None,
872                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
873                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
874                table_partition: "test-table".into(),
875                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
876                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
877                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
878                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
879                codec_config: (),
880            };
881
882            // Create freezer with data
883            let checkpoint = {
884                let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
885                    context.with_label("first"),
886                    cfg.clone(),
887                )
888                .await
889                .expect("Failed to initialize freezer");
890
891                freezer.put(test_key("key1"), 42).await.unwrap();
892                freezer.sync().await.unwrap();
893                freezer.close().await.unwrap()
894            };
895
896            // Add extra bytes to the table blob
897            {
898                let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
899                // Append garbage data
900                blob.write_at(size, hex!("0xdeadbeef").to_vec())
901                    .await
902                    .unwrap();
903                blob.sync().await.unwrap();
904            }
905
906            // Reopen and verify it handles extra bytes gracefully
907            {
908                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
909                    context.with_label("second"),
910                    cfg.clone(),
911                    Some(checkpoint),
912                )
913                .await
914                .expect("Failed to initialize freezer");
915
916                // Should still be able to read the key
917                assert_eq!(
918                    freezer
919                        .get(Identifier::Key(&test_key("key1")))
920                        .await
921                        .unwrap(),
922                    Some(42)
923                );
924
925                // And write new data
926                let mut freezer_mut = freezer;
927                freezer_mut.put(test_key("key2"), 43).await.unwrap();
928                assert_eq!(
929                    freezer_mut
930                        .get(Identifier::Key(&test_key("key2")))
931                        .await
932                        .unwrap(),
933                    Some(43)
934                );
935            }
936        });
937    }
938
939    #[test_traced]
940    fn test_indexing_across_resizes() {
941        // Initialize the deterministic context
942        let executor = deterministic::Runner::default();
943        executor.start(|context| async move {
944            // Initialize the freezer
945            let cfg = Config {
946                key_partition: "test-key-index".into(),
947                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
948                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
949                value_partition: "test-value-journal".into(),
950                value_compression: None,
951                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
952                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
953                table_partition: "test-table".into(),
954                table_initial_size: 2, // Very small initial size to force multiple resizes
955                table_resize_frequency: 2, // Resize after 2 items per entry
956                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
957                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
958                codec_config: (),
959            };
960            let mut freezer =
961                Freezer::<_, FixedBytes<64>, i32>::init(context.with_label("first"), cfg.clone())
962                    .await
963                    .expect("Failed to initialize freezer");
964
965            // Insert many keys to force multiple table resizes
966            // Table will grow from 2 -> 4 -> 8 -> 16 -> 32 -> 64 -> 128 -> 256 -> 512 -> 1024
967            let mut keys = Vec::new();
968            for i in 0..1000 {
969                let key = test_key(&format!("key{i}"));
970                keys.push((key.clone(), i));
971
972                // Force sync to ensure resize occurs ASAP
973                freezer.put(key, i).await.expect("Failed to put data");
974                freezer.sync().await.expect("Failed to sync");
975            }
976
977            // Verify all keys can still be found after multiple resizes
978            for (key, value) in &keys {
979                let retrieved = freezer
980                    .get(Identifier::Key(key))
981                    .await
982                    .expect("Failed to get data")
983                    .expect("Data not found");
984                assert_eq!(retrieved, *value, "Value mismatch for key after resizes");
985            }
986
987            // Close and reopen to verify persistence
988            let checkpoint = freezer.close().await.expect("Failed to close");
989            let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
990                context.with_label("second"),
991                cfg.clone(),
992                Some(checkpoint),
993            )
994            .await
995            .expect("Failed to reinitialize freezer");
996
997            // Verify all keys can still be found after restart
998            for (key, value) in &keys {
999                let retrieved = freezer
1000                    .get(Identifier::Key(key))
1001                    .await
1002                    .expect("Failed to get data")
1003                    .expect("Data not found");
1004                assert_eq!(retrieved, *value, "Value mismatch for key after restart");
1005            }
1006
1007            // Verify metrics show resize operations occurred
1008            let buffer = context.encode();
1009            assert!(buffer.contains("first_resizes_total 8"), "{}", buffer);
1010        });
1011    }
1012
1013    #[test_traced]
1014    fn test_insert_during_resize() {
1015        let executor = deterministic::Runner::default();
1016        executor.start(|context| async move {
1017            let cfg = Config {
1018                key_partition: "test-key-index".into(),
1019                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1020                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1021                value_partition: "test-value-journal".into(),
1022                value_compression: None,
1023                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1024                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1025                table_partition: "test-table".into(),
1026                table_initial_size: 2,
1027                table_resize_frequency: 1,
1028                table_resize_chunk_size: 1, // Process one at a time
1029                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1030                codec_config: (),
1031            };
1032            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
1033                .await
1034                .unwrap();
1035
1036            // Insert keys to trigger resize
1037            // key0 -> entry 0, key2 -> entry 1
1038            freezer.put(test_key("key0"), 0).await.unwrap();
1039            freezer.put(test_key("key2"), 1).await.unwrap();
1040            freezer.sync().await.unwrap(); // should start resize
1041
1042            // Verify resize started
1043            assert!(freezer.resizing().is_some());
1044
1045            // Insert during resize (to first entry)
1046            // key6 -> entry 0
1047            freezer.put(test_key("key6"), 2).await.unwrap();
1048            assert!(context.encode().contains("unnecessary_writes_total 1"));
1049            assert_eq!(freezer.resizable(), 3);
1050
1051            // Insert another key (to unmodified entry)
1052            // key3 -> entry 1
1053            freezer.put(test_key("key3"), 3).await.unwrap();
1054            assert!(context.encode().contains("unnecessary_writes_total 1"));
1055            assert_eq!(freezer.resizable(), 3);
1056
1057            // Verify resize completed
1058            freezer.sync().await.unwrap();
1059            assert!(freezer.resizing().is_none());
1060            assert_eq!(freezer.resizable(), 2);
1061
1062            // More inserts
1063            // key4 -> entry 1, key7 -> entry 0
1064            freezer.put(test_key("key4"), 4).await.unwrap();
1065            freezer.put(test_key("key7"), 5).await.unwrap();
1066            freezer.sync().await.unwrap();
1067
1068            // Another resize should've started
1069            assert!(freezer.resizing().is_some());
1070
1071            // Verify all can be retrieved during resize
1072            let keys = ["key0", "key2", "key6", "key3", "key4", "key7"];
1073            for (i, k) in keys.iter().enumerate() {
1074                assert_eq!(
1075                    freezer.get(Identifier::Key(&test_key(k))).await.unwrap(),
1076                    Some(i as i32)
1077                );
1078            }
1079
1080            // Sync until resize completes
1081            while freezer.resizing().is_some() {
1082                freezer.sync().await.unwrap();
1083            }
1084
1085            // Ensure no entries are considered resizable
1086            assert_eq!(freezer.resizable(), 0);
1087        });
1088    }
1089
1090    #[test_traced]
1091    fn test_resize_after_startup() {
1092        let executor = deterministic::Runner::default();
1093        executor.start(|context| async move {
1094            let cfg = Config {
1095                key_partition: "test-key-index".into(),
1096                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1097                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1098                value_partition: "test-value-journal".into(),
1099                value_compression: None,
1100                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1101                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1102                table_partition: "test-table".into(),
1103                table_initial_size: 2,
1104                table_resize_frequency: 1,
1105                table_resize_chunk_size: 1, // Process one at a time
1106                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1107                codec_config: (),
1108            };
1109
1110            // Create freezer and then shutdown uncleanly
1111            let checkpoint = {
1112                let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
1113                    context.with_label("first"),
1114                    cfg.clone(),
1115                )
1116                .await
1117                .unwrap();
1118
1119                // Insert keys to trigger resize
1120                // key0 -> entry 0, key2 -> entry 1
1121                freezer.put(test_key("key0"), 0).await.unwrap();
1122                freezer.put(test_key("key2"), 1).await.unwrap();
1123                let checkpoint = freezer.sync().await.unwrap();
1124
1125                // Verify resize started
1126                assert!(freezer.resizing().is_some());
1127
1128                checkpoint
1129            };
1130
1131            // Reopen freezer
1132            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
1133                context.with_label("second"),
1134                cfg.clone(),
1135                Some(checkpoint),
1136            )
1137            .await
1138            .unwrap();
1139            assert_eq!(freezer.resizable(), 1);
1140
1141            // Verify resize starts immediately (1 key will have 0 added but 1
1142            // will still have 1)
1143            freezer.sync().await.unwrap();
1144            assert!(freezer.resizing().is_some());
1145
1146            // Run until resize completes
1147            while freezer.resizing().is_some() {
1148                freezer.sync().await.unwrap();
1149            }
1150
1151            // Ensure no entries are considered resizable
1152            assert_eq!(freezer.resizable(), 0);
1153        });
1154    }
1155
1156    fn test_operations_and_restart(num_keys: usize) -> String {
1157        let executor = deterministic::Runner::default();
1158        executor.start(|mut context| async move {
1159            let cfg = Config {
1160                key_partition: "test-key-index".into(),
1161                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1162                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1163                value_partition: "test-value-journal".into(),
1164                value_compression: None,
1165                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1166                value_target_size: 128, // Force multiple journal sections
1167                table_partition: "test-table".into(),
1168                table_initial_size: 8,     // Small table to force collisions
1169                table_resize_frequency: 2, // Force resize frequently
1170                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1171                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1172                codec_config: (),
1173            };
1174            let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init(
1175                context.with_label("init1"),
1176                cfg.clone(),
1177            )
1178            .await
1179            .expect("Failed to initialize freezer");
1180
1181            // Generate and insert random key-value pairs
1182            let mut pairs = Vec::new();
1183
1184            for _ in 0..num_keys {
1185                // Generate random key
1186                let mut key = [0u8; 96];
1187                context.fill_bytes(&mut key);
1188                let key = FixedBytes::<96>::new(key);
1189
1190                // Generate random value
1191                let mut value = [0u8; 256];
1192                context.fill_bytes(&mut value);
1193                let value = FixedBytes::<256>::new(value);
1194
1195                // Store the key-value pair
1196                freezer
1197                    .put(key.clone(), value.clone())
1198                    .await
1199                    .expect("Failed to put data");
1200                pairs.push((key, value));
1201
1202                // Randomly sync to test resizing
1203                if context.gen_bool(0.1) {
1204                    freezer.sync().await.expect("Failed to sync");
1205                }
1206            }
1207
1208            // Sync data
1209            freezer.sync().await.expect("Failed to sync");
1210
1211            // Verify all pairs can be retrieved
1212            for (key, value) in &pairs {
1213                let retrieved = freezer
1214                    .get(Identifier::Key(key))
1215                    .await
1216                    .expect("Failed to get data")
1217                    .expect("Data not found");
1218                assert_eq!(&retrieved, value);
1219            }
1220
1221            // Test get() on all keys
1222            for (key, _) in &pairs {
1223                assert!(freezer
1224                    .get(Identifier::Key(key))
1225                    .await
1226                    .expect("Failed to check key")
1227                    .is_some());
1228            }
1229
1230            // Check some non-existent keys
1231            for _ in 0..10 {
1232                let mut key = [0u8; 96];
1233                context.fill_bytes(&mut key);
1234                let key = FixedBytes::<96>::new(key);
1235                assert!(freezer
1236                    .get(Identifier::Key(&key))
1237                    .await
1238                    .expect("Failed to check key")
1239                    .is_none());
1240            }
1241
1242            // Close the freezer
1243            let checkpoint = freezer.close().await.expect("Failed to close freezer");
1244
1245            // Reopen the freezer
1246            let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init_with_checkpoint(
1247                context.with_label("init2"),
1248                cfg.clone(),
1249                Some(checkpoint),
1250            )
1251            .await
1252            .expect("Failed to initialize freezer");
1253
1254            // Verify all pairs are still there after restart
1255            for (key, value) in &pairs {
1256                let retrieved = freezer
1257                    .get(Identifier::Key(key))
1258                    .await
1259                    .expect("Failed to get data")
1260                    .expect("Data not found");
1261                assert_eq!(&retrieved, value);
1262            }
1263
1264            // Add more pairs after restart to test collision handling
1265            for _ in 0..20 {
1266                let mut key = [0u8; 96];
1267                context.fill_bytes(&mut key);
1268                let key = FixedBytes::<96>::new(key);
1269
1270                let mut value = [0u8; 256];
1271                context.fill_bytes(&mut value);
1272                let value = FixedBytes::<256>::new(value);
1273
1274                freezer.put(key, value).await.expect("Failed to put data");
1275            }
1276
1277            // Multiple syncs to test epoch progression
1278            for _ in 0..3 {
1279                freezer.sync().await.expect("Failed to sync");
1280
1281                // Add a few more entries between syncs
1282                for _ in 0..5 {
1283                    let mut key = [0u8; 96];
1284                    context.fill_bytes(&mut key);
1285                    let key = FixedBytes::<96>::new(key);
1286
1287                    let mut value = [0u8; 256];
1288                    context.fill_bytes(&mut value);
1289                    let value = FixedBytes::<256>::new(value);
1290
1291                    freezer.put(key, value).await.expect("Failed to put data");
1292                }
1293            }
1294
1295            // Final sync
1296            freezer.sync().await.expect("Failed to sync");
1297
1298            // Return the auditor state for comparison
1299            context.auditor().state()
1300        })
1301    }
1302
1303    #[test_group("slow")]
1304    #[test_traced]
1305    fn test_determinism() {
1306        let state1 = test_operations_and_restart(1_000);
1307        let state2 = test_operations_and_restart(1_000);
1308        assert_eq!(state1, state2);
1309    }
1310
1311    #[test_traced]
1312    fn test_put_multiple_updates() {
1313        // Initialize the deterministic context
1314        let executor = deterministic::Runner::default();
1315        executor.start(|context| async move {
1316            // Initialize the freezer
1317            let cfg = Config {
1318                key_partition: "test-key-index".into(),
1319                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1320                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1321                value_partition: "test-value-journal".into(),
1322                value_compression: None,
1323                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1324                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1325                table_partition: "test-table".into(),
1326                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
1327                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
1328                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1329                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1330                codec_config: (),
1331            };
1332            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
1333                .await
1334                .expect("Failed to initialize freezer");
1335
1336            let key = test_key("key1");
1337
1338            freezer
1339                .put(key.clone(), 1)
1340                .await
1341                .expect("Failed to put data");
1342            freezer
1343                .put(key.clone(), 2)
1344                .await
1345                .expect("Failed to put data");
1346            freezer.sync().await.expect("Failed to sync");
1347            assert_eq!(
1348                freezer
1349                    .get(Identifier::Key(&key))
1350                    .await
1351                    .expect("Failed to get data")
1352                    .unwrap(),
1353                2
1354            );
1355        });
1356    }
1357}