Skip to main content

commonware_storage/freezer/
mod.rs

1//! An immutable key-value store optimized for minimal memory usage and write amplification.
2//!
3//! [Freezer] is a key-value store designed for permanent storage where data is written once and never
4//! modified. Meant for resource-constrained environments, [Freezer] exclusively employs disk-resident
5//! data structures to serve queries and avoids ever rewriting (i.e. compacting) inserted data.
6//!
7//! As a byproduct of the mechanisms used to satisfy these constraints, [Freezer] consistently provides
8//! low latency access to recently added data (regardless of how much data has been stored) at the expense
9//! of a logarithmic increase in latency for old data (increasing with the number of items stored).
10//!
11//! # Format
12//!
13//! The [Freezer] uses a three-level architecture:
14//! 1. An extendible hash table (written in a single [commonware_runtime::Blob]) that maps keys to locations
15//! 2. A key index journal ([crate::journal::segmented::fixed]) that stores keys and collision chain pointers
16//! 3. A value journal ([crate::journal::segmented::glob]) that stores the actual values
17//!
18//! These journals are combined via [crate::journal::segmented::oversized], which coordinates
19//! crash recovery between them.
20//!
21//! ```text
22//! +-----------------------------------------------------------------+
23//! |                           Hash Table                            |
24//! |  +---------+---------+---------+---------+---------+---------+  |
25//! |  | Entry 0 | Entry 1 | Entry 2 | Entry 3 | Entry 4 |   ...   |  |
26//! |  +----+----+----+----+----+----+----+----+----+----+---------+  |
27//! +-------|---------|---------|---------|---------|---------|-------+
28//!         |         |         |         |         |         |
29//!         v         v         v         v         v         v
30//! +-----------------------------------------------------------------+
31//! |                      Key Index Journal                          |
32//! |  Section 0: [Entry 0][Entry 1][Entry 2]...                      |
33//! |  Section 1: [Entry 10][Entry 11][Entry 12]...                   |
34//! |  Section N: [Entry 100][Entry 101][Entry 102]...                |
35//! +-------|---------|---------|---------|---------|---------|-------+
36//!         |         |         |         |         |         |
37//!         v         v         v         v         v         v
38//! +-----------------------------------------------------------------+
39//! |                        Value Journal                            |
40//! |  Section 0: [Value 0][Value 1][Value 2]...                      |
41//! |  Section 1: [Value 10][Value 11][Value 12]...                   |
42//! |  Section N: [Value 100][Value 101][Value 102]...                |
43//! +-----------------------------------------------------------------+
44//! ```
45//!
46//! The table uses two fixed-size slots per entry to ensure consistency during updates. Each slot
47//! contains an epoch number that monotonically increases with each sync operation. During reads,
48//! the slot with the higher epoch is selected (provided it's not greater than the last committed
49//! epoch), ensuring consistency even if the system crashed during a write.
50//!
51//! ```text
52//! +-------------------------------------+
53//! |          Hash Table Entry           |
54//! +-------------------------------------+
55//! |     Slot 0      |      Slot 1       |
56//! +-----------------+-------------------+
57//! | epoch:    u64   | epoch:    u64     |
58//! | section:  u64   | section:  u64     |
59//! | offset:   u32   | offset:   u32     |
60//! | added:    u8    | added:    u8      |
61//! +-----------------+-------------------+
62//! | CRC32:    u32   | CRC32:    u32     |
63//! +-----------------+-------------------+
64//! ```
65//!
66//! The key index journal stores fixed-size entries containing a key, a pointer to the value in the
67//! value journal, and an optional pointer to the next entry in the collision chain (for keys that
68//! hash to the same table index).
69//!
70//! ```text
71//! +-------------------------------------+
72//! |        Key Index Entry              |
73//! +-------------------------------------+
74//! | Key:           Array                |
75//! | Value Offset:  u64                  |
76//! | Value Size:    u32                  |
77//! | Next:          Option<(u64, u32)>   |
78//! +-------------------------------------+
79//! ```
80//!
81//! The value journal stores the actual encoded values at the offsets referenced by the key index entries.
82//!
83//! # Traversing Conflicts
84//!
85//! When multiple keys hash to the same table index, they form a linked list within the key index
86//! journal. Each key index entry points to its value in the value journal:
87//!
88//! ```text
89//! Hash Table:
90//! [Index 42]         +-------------------+
91//!                    | section: 2        |
92//!                    | offset: 768       |
93//!                    +---------+---------+
94//!                              |
95//! Key Index Journal:           v
96//! [Section 2]        +-----------------------+
97//!                    | Key: "foo"            |
98//!                    | ValOff: 100           |
99//!                    | ValSize: 20           |
100//!                    | Next: (1, 512) -------+---+
101//!                    +-----------------------+   |
102//!                                                v
103//! [Section 1]        +-----------------------+
104//!                    | Key: "bar"            |
105//!                    | ValOff: 50            |
106//!                    | ValSize: 20           |
107//!                    | Next: (0, 256) -------+---+
108//!                    +-----------------------+   |
109//!                                                v
110//! [Section 0]        +-----------------------+
111//!                    | Key: "baz"            |
112//!                    | ValOff: 0             |
113//!                    | ValSize: 20           |
114//!                    | Next: None            |
115//!                    +-----------------------+
116//!
117//! Value Journal:
118//! [Section 0]        [Value: 126 @ offset 0 ]
119//! [Section 1]        [Value: 84  @ offset 50]
120//! [Section 2]        [Value: 42  @ offset 100]
121//! ```
122//!
123//! New entries are prepended to the chain, becoming the new head. During lookup, the chain
124//! is traversed until a matching key is found. The `added` field in the table entry tracks
125//! insertions since the last resize, triggering table growth when 50% of entries have had
126//! `table_resize_frequency` items added (since the last resize).
127//!
128//! # Extendible Hashing
129//!
130//! The [Freezer] uses bit-based indexing to grow the on-disk hash table without rehashing existing entries:
131//!
132//! ```text
133//! Initial state (table_size=4, using 2 bits of hash):
134//! Hash: 0b...00 -> Index 0
135//! Hash: 0b...01 -> Index 1
136//! Hash: 0b...10 -> Index 2
137//! Hash: 0b...11 -> Index 3
138//!
139//! After resize (table_size=8, using 3 bits of hash):
140//! Hash: 0b...000 -> Index 0 -+
141//! ...                        |
142//! Hash: 0b...100 -> Index 4 -+- Both map to old Index 0
143//! Hash: 0b...001 -> Index 1 -+
144//! ...                        |
145//! Hash: 0b...101 -> Index 5 -+- Both map to old Index 1
146//! ```
147//!
148//! When the table doubles in size:
149//! 1. Each entry at index `i` splits into two entries: `i` and `i + old_size`
150//! 2. The existing chain head is copied to both locations with `added=0`
151//! 3. Future insertions will naturally distribute between the two entries based on their hash
152//!
153//! This approach ensures that entries inserted before a resize remain discoverable after the resize,
154//! as the lookup algorithm checks the appropriate entry based on the current table size. As more and more
155//! items are added (and resizes occur), the latency for fetching old data will increase logarithmically
156//! (with the number of items stored).
157//!
158//! To prevent a "stall" during a single resize, the table is resized incrementally across multiple sync calls.
159//! Each sync will process up to `table_resize_chunk_size` entries until the resize is complete. If there is
160//! an ongoing resize when closing the [Freezer], the resize will be completed before closing.
161//!
162//! # Example
163//!
164//! ```rust
165//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
166//! use commonware_storage::freezer::{Freezer, Config, Identifier};
167//! use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16};
168//!
169//! let executor = deterministic::Runner::default();
170//! executor.start(|context| async move {
171//!     // Create a freezer
172//!     let cfg = Config {
173//!         key_partition: "freezer_key_index".into(),
174//!         key_write_buffer: NZUsize!(1024 * 1024), // 1MB
175//!         key_page_cache: CacheRef::new(NZU16!(1024), NZUsize!(10)),
176//!         value_partition: "freezer_value_journal".into(),
177//!         value_compression: Some(3),
178//!         value_write_buffer: NZUsize!(1024 * 1024), // 1MB
179//!         value_target_size: 100 * 1024 * 1024, // 100MB
180//!         table_partition: "freezer_table".into(),
181//!         table_initial_size: 65_536, // ~3MB initial table size
182//!         table_resize_frequency: 4, // Force resize once 4 writes to the same entry occur
183//!         table_resize_chunk_size: 16_384, // ~1MB of table entries rewritten per sync
184//!         table_replay_buffer: NZUsize!(1024 * 1024), // 1MB
185//!         codec_config: (),
186//!     };
187//!     let mut freezer = Freezer::<_, FixedBytes<32>, i32>::init(context, cfg).await.unwrap();
188//!
189//!     // Put a key-value pair
190//!     let key = FixedBytes::new([1u8; 32]);
191//!     freezer.put(key.clone(), 42).await.unwrap();
192//!
193//!     // Sync to disk
194//!     freezer.sync().await.unwrap();
195//!
196//!     // Get the value
197//!     let value = freezer.get(Identifier::Key(&key)).await.unwrap().unwrap();
198//!     assert_eq!(value, 42);
199//!
200//!     // Close the freezer
201//!     freezer.close().await.unwrap();
202//! });
203//! ```
204
205#[cfg(test)]
206mod conformance;
207mod storage;
208use commonware_runtime::buffer::paged::CacheRef;
209use commonware_utils::Array;
210use std::num::NonZeroUsize;
211pub use storage::{Checkpoint, Cursor, Freezer};
212use thiserror::Error;
213
214/// Subject of a [Freezer::get] operation.
215pub enum Identifier<'a, K: Array> {
216    Cursor(Cursor),
217    Key(&'a K),
218}
219
220/// Errors that can occur when interacting with the [Freezer].
221#[derive(Debug, Error)]
222pub enum Error {
223    #[error("runtime error: {0}")]
224    Runtime(#[from] commonware_runtime::Error),
225    #[error("journal error: {0}")]
226    Journal(#[from] crate::journal::Error),
227    #[error("codec error: {0}")]
228    Codec(#[from] commonware_codec::Error),
229}
230
231/// Configuration for [Freezer].
232#[derive(Clone)]
233pub struct Config<C> {
234    /// The [commonware_runtime::Storage] partition for the key index journal.
235    pub key_partition: String,
236
237    /// The size of the write buffer for the key index journal.
238    pub key_write_buffer: NonZeroUsize,
239
240    /// The page cache for the key index journal.
241    pub key_page_cache: CacheRef,
242
243    /// The [commonware_runtime::Storage] partition for the value journal.
244    pub value_partition: String,
245
246    /// The compression level for the value journal.
247    pub value_compression: Option<u8>,
248
249    /// The size of the write buffer for the value journal.
250    pub value_write_buffer: NonZeroUsize,
251
252    /// The target size of each value journal section before creating a new one.
253    pub value_target_size: u64,
254
255    /// The [commonware_runtime::Storage] partition to use for storing the table.
256    pub table_partition: String,
257
258    /// The initial number of items in the table.
259    pub table_initial_size: u32,
260
261    /// The number of items that must be added to 50% of table entries since the last resize before
262    /// the table is resized again.
263    pub table_resize_frequency: u8,
264
265    /// The number of items to move during each resize operation (many may be required to complete a resize).
266    pub table_resize_chunk_size: u32,
267
268    /// The size of the read buffer to use when scanning the table (e.g., during recovery or resize).
269    pub table_replay_buffer: NonZeroUsize,
270
271    /// The codec configuration to use for the value stored in the freezer.
272    pub codec_config: C,
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use crate::kv::tests::test_key;
279    use commonware_macros::{test_group, test_traced};
280    use commonware_runtime::{deterministic, Blob, IoBufMut, Metrics, Runner, Storage};
281    use commonware_utils::{hex, sequence::FixedBytes, NZUsize, NZU16};
282    use rand::{Rng, RngCore};
283    use std::num::NonZeroU16;
284
285    const DEFAULT_WRITE_BUFFER: usize = 1024;
286    const DEFAULT_VALUE_TARGET_SIZE: u64 = 10 * 1024 * 1024;
287    const DEFAULT_TABLE_INITIAL_SIZE: u32 = 256;
288    const DEFAULT_TABLE_RESIZE_FREQUENCY: u8 = 4;
289    const DEFAULT_TABLE_RESIZE_CHUNK_SIZE: u32 = 128; // force multiple chunks
290    const DEFAULT_TABLE_REPLAY_BUFFER: usize = 64 * 1024; // 64KB
291    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
292    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
293
294    fn test_put_get(compression: Option<u8>) {
295        // Initialize the deterministic context
296        let executor = deterministic::Runner::default();
297        executor.start(|context| async move {
298            // Initialize the freezer
299            let cfg = Config {
300                key_partition: "test_key_index".into(),
301                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
302                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
303                value_partition: "test_value_journal".into(),
304                value_compression: compression,
305                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
306                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
307                table_partition: "test_table".into(),
308                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
309                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
310                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
311                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
312                codec_config: (),
313            };
314            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
315                .await
316                .expect("Failed to initialize freezer");
317
318            let key = test_key("testkey");
319            let data = 42;
320
321            // Check key doesn't exist
322            let value = freezer
323                .get(Identifier::Key(&key))
324                .await
325                .expect("Failed to check key");
326            assert!(value.is_none());
327
328            // Put the key-data pair
329            freezer
330                .put(key.clone(), data)
331                .await
332                .expect("Failed to put data");
333
334            // Get the data back
335            let value = freezer
336                .get(Identifier::Key(&key))
337                .await
338                .expect("Failed to get data")
339                .expect("Data not found");
340            assert_eq!(value, data);
341
342            // Check metrics
343            let buffer = context.encode();
344            assert!(buffer.contains("gets_total 2"), "{}", buffer);
345            assert!(buffer.contains("puts_total 1"), "{}", buffer);
346            assert!(buffer.contains("unnecessary_reads_total 0"), "{}", buffer);
347
348            // Force a sync
349            freezer.sync().await.expect("Failed to sync data");
350        });
351    }
352
353    #[test_traced]
354    fn test_put_get_no_compression() {
355        test_put_get(None);
356    }
357
358    #[test_traced]
359    fn test_put_get_compression() {
360        test_put_get(Some(3));
361    }
362
363    #[test_traced]
364    fn test_multiple_keys() {
365        // Initialize the deterministic context
366        let executor = deterministic::Runner::default();
367        executor.start(|context| async move {
368            // Initialize the freezer
369            let cfg = Config {
370                key_partition: "test_key_index".into(),
371                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
372                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
373                value_partition: "test_value_journal".into(),
374                value_compression: None,
375                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
376                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
377                table_partition: "test_table".into(),
378                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
379                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
380                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
381                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
382                codec_config: (),
383            };
384            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
385                .await
386                .expect("Failed to initialize freezer");
387
388            // Insert multiple keys
389            let keys = vec![
390                (test_key("key1"), 1),
391                (test_key("key2"), 2),
392                (test_key("key3"), 3),
393                (test_key("key4"), 4),
394                (test_key("key5"), 5),
395            ];
396
397            for (key, data) in &keys {
398                freezer
399                    .put(key.clone(), *data)
400                    .await
401                    .expect("Failed to put data");
402            }
403
404            // Retrieve all keys and verify
405            for (key, data) in &keys {
406                let retrieved = freezer
407                    .get(Identifier::Key(key))
408                    .await
409                    .expect("Failed to get data")
410                    .expect("Data not found");
411                assert_eq!(retrieved, *data);
412            }
413        });
414    }
415
416    #[test_traced]
417    fn test_collision_handling() {
418        // Initialize the deterministic context
419        let executor = deterministic::Runner::default();
420        executor.start(|context| async move {
421            // Initialize the freezer with a very small table to force collisions
422            let cfg = Config {
423                key_partition: "test_key_index".into(),
424                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
425                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
426                value_partition: "test_value_journal".into(),
427                value_compression: None,
428                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
429                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
430                table_partition: "test_table".into(),
431                table_initial_size: 4, // Very small to force collisions
432                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
433                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
434                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
435                codec_config: (),
436            };
437            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
438                .await
439                .expect("Failed to initialize freezer");
440
441            // Insert multiple keys that will likely collide
442            let keys = vec![
443                (test_key("key1"), 1),
444                (test_key("key2"), 2),
445                (test_key("key3"), 3),
446                (test_key("key4"), 4),
447                (test_key("key5"), 5),
448                (test_key("key6"), 6),
449                (test_key("key7"), 7),
450                (test_key("key8"), 8),
451            ];
452
453            for (key, data) in &keys {
454                freezer
455                    .put(key.clone(), *data)
456                    .await
457                    .expect("Failed to put data");
458            }
459
460            // Sync to disk
461            freezer.sync().await.expect("Failed to sync");
462
463            // Retrieve all keys and verify they can still be found
464            for (key, data) in &keys {
465                let retrieved = freezer
466                    .get(Identifier::Key(key))
467                    .await
468                    .expect("Failed to get data")
469                    .expect("Data not found");
470                assert_eq!(retrieved, *data);
471            }
472
473            // Check metrics
474            let buffer = context.encode();
475            assert!(buffer.contains("gets_total 8"), "{}", buffer);
476            assert!(buffer.contains("unnecessary_reads_total 5"), "{}", buffer);
477        });
478    }
479
480    #[test_traced]
481    fn test_restart() {
482        // Initialize the deterministic context
483        let executor = deterministic::Runner::default();
484        executor.start(|context| async move {
485            let cfg = Config {
486                key_partition: "test_key_index".into(),
487                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
488                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
489                value_partition: "test_value_journal".into(),
490                value_compression: None,
491                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
492                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
493                table_partition: "test_table".into(),
494                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
495                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
496                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
497                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
498                codec_config: (),
499            };
500
501            // Insert data and close the freezer
502            let checkpoint = {
503                let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
504                    context.with_label("first"),
505                    cfg.clone(),
506                )
507                .await
508                .expect("Failed to initialize freezer");
509
510                let keys = vec![
511                    (test_key("persist1"), 100),
512                    (test_key("persist2"), 200),
513                    (test_key("persist3"), 300),
514                ];
515
516                for (key, data) in &keys {
517                    freezer
518                        .put(key.clone(), *data)
519                        .await
520                        .expect("Failed to put data");
521                }
522
523                freezer.close().await.expect("Failed to close freezer")
524            };
525
526            // Reopen and verify data persisted
527            {
528                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
529                    context.with_label("second"),
530                    cfg.clone(),
531                    Some(checkpoint),
532                )
533                .await
534                .expect("Failed to initialize freezer");
535
536                let keys = vec![
537                    (test_key("persist1"), 100),
538                    (test_key("persist2"), 200),
539                    (test_key("persist3"), 300),
540                ];
541
542                for (key, data) in &keys {
543                    let retrieved = freezer
544                        .get(Identifier::Key(key))
545                        .await
546                        .expect("Failed to get data")
547                        .expect("Data not found");
548                    assert_eq!(retrieved, *data);
549                }
550            }
551        });
552    }
553
554    #[test_traced]
555    fn test_crash_consistency() {
556        // Initialize the deterministic context
557        let executor = deterministic::Runner::default();
558        executor.start(|context| async move {
559            let cfg = Config {
560                key_partition: "test_key_index".into(),
561                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
562                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
563                value_partition: "test_value_journal".into(),
564                value_compression: None,
565                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
566                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
567                table_partition: "test_table".into(),
568                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
569                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
570                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
571                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
572                codec_config: (),
573            };
574
575            // First, create some committed data and close the freezer
576            let checkpoint = {
577                let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
578                    context.with_label("first"),
579                    cfg.clone(),
580                )
581                .await
582                .expect("Failed to initialize freezer");
583
584                freezer
585                    .put(test_key("committed1"), 1)
586                    .await
587                    .expect("Failed to put data");
588                freezer
589                    .put(test_key("committed2"), 2)
590                    .await
591                    .expect("Failed to put data");
592
593                // Sync to ensure data is committed
594                freezer.sync().await.expect("Failed to sync");
595
596                // Add more data but don't sync (simulating crash)
597                freezer
598                    .put(test_key("uncommitted1"), 3)
599                    .await
600                    .expect("Failed to put data");
601                freezer
602                    .put(test_key("uncommitted2"), 4)
603                    .await
604                    .expect("Failed to put data");
605
606                // Close without syncing to simulate crash
607                freezer.close().await.expect("Failed to close")
608            };
609
610            // Reopen and verify only committed data is present
611            {
612                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
613                    context.with_label("second"),
614                    cfg.clone(),
615                    Some(checkpoint),
616                )
617                .await
618                .expect("Failed to initialize freezer");
619
620                // Committed data should be present
621                assert_eq!(
622                    freezer
623                        .get(Identifier::Key(&test_key("committed1")))
624                        .await
625                        .unwrap(),
626                    Some(1)
627                );
628                assert_eq!(
629                    freezer
630                        .get(Identifier::Key(&test_key("committed2")))
631                        .await
632                        .unwrap(),
633                    Some(2)
634                );
635
636                // Uncommitted data might or might not be present depending on implementation
637                // But if present, it should be correct
638                if let Some(val) = freezer
639                    .get(Identifier::Key(&test_key("uncommitted1")))
640                    .await
641                    .unwrap()
642                {
643                    assert_eq!(val, 3);
644                }
645                if let Some(val) = freezer
646                    .get(Identifier::Key(&test_key("uncommitted2")))
647                    .await
648                    .unwrap()
649                {
650                    assert_eq!(val, 4);
651                }
652            }
653        });
654    }
655
656    #[test_traced]
657    fn test_destroy() {
658        // Initialize the deterministic context
659        let executor = deterministic::Runner::default();
660        executor.start(|context| async move {
661            // Initialize the freezer
662            let cfg = Config {
663                key_partition: "test_key_index".into(),
664                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
665                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
666                value_partition: "test_value_journal".into(),
667                value_compression: None,
668                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
669                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
670                table_partition: "test_table".into(),
671                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
672                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
673                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
674                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
675                codec_config: (),
676            };
677            {
678                let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
679                    context.with_label("first"),
680                    cfg.clone(),
681                )
682                .await
683                .expect("Failed to initialize freezer");
684
685                freezer
686                    .put(test_key("destroy1"), 1)
687                    .await
688                    .expect("Failed to put data");
689                freezer
690                    .put(test_key("destroy2"), 2)
691                    .await
692                    .expect("Failed to put data");
693
694                // Destroy the freezer
695                freezer.destroy().await.expect("Failed to destroy freezer");
696            }
697
698            // Try to create a new freezer - it should be empty
699            {
700                let freezer = Freezer::<_, FixedBytes<64>, i32>::init(
701                    context.with_label("second"),
702                    cfg.clone(),
703                )
704                .await
705                .expect("Failed to initialize freezer");
706
707                // Should not find any data
708                assert!(freezer
709                    .get(Identifier::Key(&test_key("destroy1")))
710                    .await
711                    .unwrap()
712                    .is_none());
713                assert!(freezer
714                    .get(Identifier::Key(&test_key("destroy2")))
715                    .await
716                    .unwrap()
717                    .is_none());
718            }
719        });
720    }
721
722    #[test_traced]
723    fn test_partial_table_entry_write() {
724        // Initialize the deterministic context
725        let executor = deterministic::Runner::default();
726        executor.start(|context| async move {
727            // Initialize the freezer
728            let cfg = Config {
729                key_partition: "test_key_index".into(),
730                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
731                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
732                value_partition: "test_value_journal".into(),
733                value_compression: None,
734                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
735                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
736                table_partition: "test_table".into(),
737                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
738                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
739                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
740                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
741                codec_config: (),
742            };
743            let checkpoint = {
744                let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
745                    context.with_label("first"),
746                    cfg.clone(),
747                )
748                .await
749                .expect("Failed to initialize freezer");
750
751                freezer.put(test_key("key1"), 42).await.unwrap();
752                freezer.sync().await.unwrap();
753                freezer.close().await.unwrap()
754            };
755
756            // Corrupt the table by writing partial entry
757            {
758                let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
759                // Write incomplete table entry (only 10 bytes instead of 24)
760                blob.write_at(0, vec![0xFF; 10]).await.unwrap();
761                blob.sync().await.unwrap();
762            }
763
764            // Reopen and verify it handles the corruption
765            {
766                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
767                    context.with_label("second"),
768                    cfg.clone(),
769                    Some(checkpoint),
770                )
771                .await
772                .expect("Failed to initialize freezer");
773
774                // The key should still be retrievable from journal if table is corrupted
775                // but the table entry is zeroed out
776                let result = freezer
777                    .get(Identifier::Key(&test_key("key1")))
778                    .await
779                    .unwrap();
780                assert!(result.is_none() || result == Some(42));
781            }
782        });
783    }
784
785    #[test_traced]
786    fn test_table_entry_invalid_crc() {
787        // Initialize the deterministic context
788        let executor = deterministic::Runner::default();
789        executor.start(|context| async move {
790            let cfg = Config {
791                key_partition: "test_key_index".into(),
792                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
793                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
794                value_partition: "test_value_journal".into(),
795                value_compression: None,
796                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
797                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
798                table_partition: "test_table".into(),
799                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
800                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
801                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
802                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
803                codec_config: (),
804            };
805
806            // Create freezer with data
807            let checkpoint = {
808                let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
809                    context.with_label("first"),
810                    cfg.clone(),
811                )
812                .await
813                .expect("Failed to initialize freezer");
814
815                freezer.put(test_key("key1"), 42).await.unwrap();
816                freezer.sync().await.unwrap();
817                freezer.close().await.unwrap()
818            };
819
820            // Corrupt the CRC in the index entry
821            {
822                let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
823                // Read the first entry
824                let entry_data = blob.read_at(0, IoBufMut::zeroed(24)).await.unwrap();
825                let mut corrupted = entry_data.coalesce();
826                // Corrupt the CRC (last 4 bytes of the entry)
827                corrupted.as_mut()[20] ^= 0xFF;
828                blob.write_at(0, corrupted).await.unwrap();
829                blob.sync().await.unwrap();
830            }
831
832            // Reopen and verify it handles invalid CRC
833            {
834                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
835                    context.with_label("second"),
836                    cfg.clone(),
837                    Some(checkpoint),
838                )
839                .await
840                .expect("Failed to initialize freezer");
841
842                // With invalid CRC, the entry should be treated as invalid
843                let result = freezer
844                    .get(Identifier::Key(&test_key("key1")))
845                    .await
846                    .unwrap();
847                // The freezer should still work but may not find the key due to invalid table entry
848                assert!(result.is_none() || result == Some(42));
849            }
850        });
851    }
852
853    #[test_traced]
854    fn test_table_extra_bytes() {
855        // Initialize the deterministic context
856        let executor = deterministic::Runner::default();
857        executor.start(|context| async move {
858            let cfg = Config {
859                key_partition: "test_key_index".into(),
860                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
861                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
862                value_partition: "test_value_journal".into(),
863                value_compression: None,
864                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
865                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
866                table_partition: "test_table".into(),
867                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
868                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
869                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
870                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
871                codec_config: (),
872            };
873
874            // Create freezer with data
875            let checkpoint = {
876                let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
877                    context.with_label("first"),
878                    cfg.clone(),
879                )
880                .await
881                .expect("Failed to initialize freezer");
882
883                freezer.put(test_key("key1"), 42).await.unwrap();
884                freezer.sync().await.unwrap();
885                freezer.close().await.unwrap()
886            };
887
888            // Add extra bytes to the table blob
889            {
890                let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
891                // Append garbage data
892                blob.write_at(size, hex!("0xdeadbeef").to_vec())
893                    .await
894                    .unwrap();
895                blob.sync().await.unwrap();
896            }
897
898            // Reopen and verify it handles extra bytes gracefully
899            {
900                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
901                    context.with_label("second"),
902                    cfg.clone(),
903                    Some(checkpoint),
904                )
905                .await
906                .expect("Failed to initialize freezer");
907
908                // Should still be able to read the key
909                assert_eq!(
910                    freezer
911                        .get(Identifier::Key(&test_key("key1")))
912                        .await
913                        .unwrap(),
914                    Some(42)
915                );
916
917                // And write new data
918                let mut freezer_mut = freezer;
919                freezer_mut.put(test_key("key2"), 43).await.unwrap();
920                assert_eq!(
921                    freezer_mut
922                        .get(Identifier::Key(&test_key("key2")))
923                        .await
924                        .unwrap(),
925                    Some(43)
926                );
927            }
928        });
929    }
930
931    #[test_traced]
932    fn test_indexing_across_resizes() {
933        // Initialize the deterministic context
934        let executor = deterministic::Runner::default();
935        executor.start(|context| async move {
936            // Initialize the freezer
937            let cfg = Config {
938                key_partition: "test_key_index".into(),
939                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
940                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
941                value_partition: "test_value_journal".into(),
942                value_compression: None,
943                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
944                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
945                table_partition: "test_table".into(),
946                table_initial_size: 2, // Very small initial size to force multiple resizes
947                table_resize_frequency: 2, // Resize after 2 items per entry
948                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
949                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
950                codec_config: (),
951            };
952            let mut freezer =
953                Freezer::<_, FixedBytes<64>, i32>::init(context.with_label("first"), cfg.clone())
954                    .await
955                    .expect("Failed to initialize freezer");
956
957            // Insert many keys to force multiple table resizes
958            // Table will grow from 2 -> 4 -> 8 -> 16 -> 32 -> 64 -> 128 -> 256 -> 512 -> 1024
959            let mut keys = Vec::new();
960            for i in 0..1000 {
961                let key = test_key(&format!("key{i}"));
962                keys.push((key.clone(), i));
963
964                // Force sync to ensure resize occurs ASAP
965                freezer.put(key, i).await.expect("Failed to put data");
966                freezer.sync().await.expect("Failed to sync");
967            }
968
969            // Verify all keys can still be found after multiple resizes
970            for (key, value) in &keys {
971                let retrieved = freezer
972                    .get(Identifier::Key(key))
973                    .await
974                    .expect("Failed to get data")
975                    .expect("Data not found");
976                assert_eq!(retrieved, *value, "Value mismatch for key after resizes");
977            }
978
979            // Close and reopen to verify persistence
980            let checkpoint = freezer.close().await.expect("Failed to close");
981            let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
982                context.with_label("second"),
983                cfg.clone(),
984                Some(checkpoint),
985            )
986            .await
987            .expect("Failed to reinitialize freezer");
988
989            // Verify all keys can still be found after restart
990            for (key, value) in &keys {
991                let retrieved = freezer
992                    .get(Identifier::Key(key))
993                    .await
994                    .expect("Failed to get data")
995                    .expect("Data not found");
996                assert_eq!(retrieved, *value, "Value mismatch for key after restart");
997            }
998
999            // Verify metrics show resize operations occurred
1000            let buffer = context.encode();
1001            assert!(buffer.contains("first_resizes_total 8"), "{}", buffer);
1002        });
1003    }
1004
1005    #[test_traced]
1006    fn test_insert_during_resize() {
1007        let executor = deterministic::Runner::default();
1008        executor.start(|context| async move {
1009            let cfg = Config {
1010                key_partition: "test_key_index".into(),
1011                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1012                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1013                value_partition: "test_value_journal".into(),
1014                value_compression: None,
1015                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1016                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1017                table_partition: "test_table".into(),
1018                table_initial_size: 2,
1019                table_resize_frequency: 1,
1020                table_resize_chunk_size: 1, // Process one at a time
1021                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1022                codec_config: (),
1023            };
1024            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
1025                .await
1026                .unwrap();
1027
1028            // Insert keys to trigger resize
1029            // key0 -> entry 0, key2 -> entry 1
1030            freezer.put(test_key("key0"), 0).await.unwrap();
1031            freezer.put(test_key("key2"), 1).await.unwrap();
1032            freezer.sync().await.unwrap(); // should start resize
1033
1034            // Verify resize started
1035            assert!(freezer.resizing().is_some());
1036
1037            // Insert during resize (to first entry)
1038            // key6 -> entry 0
1039            freezer.put(test_key("key6"), 2).await.unwrap();
1040            assert!(context.encode().contains("unnecessary_writes_total 1"));
1041            assert_eq!(freezer.resizable(), 3);
1042
1043            // Insert another key (to unmodified entry)
1044            // key3 -> entry 1
1045            freezer.put(test_key("key3"), 3).await.unwrap();
1046            assert!(context.encode().contains("unnecessary_writes_total 1"));
1047            assert_eq!(freezer.resizable(), 3);
1048
1049            // Verify resize completed
1050            freezer.sync().await.unwrap();
1051            assert!(freezer.resizing().is_none());
1052            assert_eq!(freezer.resizable(), 2);
1053
1054            // More inserts
1055            // key4 -> entry 1, key7 -> entry 0
1056            freezer.put(test_key("key4"), 4).await.unwrap();
1057            freezer.put(test_key("key7"), 5).await.unwrap();
1058            freezer.sync().await.unwrap();
1059
1060            // Another resize should've started
1061            assert!(freezer.resizing().is_some());
1062
1063            // Verify all can be retrieved during resize
1064            let keys = ["key0", "key2", "key6", "key3", "key4", "key7"];
1065            for (i, k) in keys.iter().enumerate() {
1066                assert_eq!(
1067                    freezer.get(Identifier::Key(&test_key(k))).await.unwrap(),
1068                    Some(i as i32)
1069                );
1070            }
1071
1072            // Sync until resize completes
1073            while freezer.resizing().is_some() {
1074                freezer.sync().await.unwrap();
1075            }
1076
1077            // Ensure no entries are considered resizable
1078            assert_eq!(freezer.resizable(), 0);
1079        });
1080    }
1081
1082    #[test_traced]
1083    fn test_resize_after_startup() {
1084        let executor = deterministic::Runner::default();
1085        executor.start(|context| async move {
1086            let cfg = Config {
1087                key_partition: "test_key_index".into(),
1088                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1089                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1090                value_partition: "test_value_journal".into(),
1091                value_compression: None,
1092                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1093                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1094                table_partition: "test_table".into(),
1095                table_initial_size: 2,
1096                table_resize_frequency: 1,
1097                table_resize_chunk_size: 1, // Process one at a time
1098                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1099                codec_config: (),
1100            };
1101
1102            // Create freezer and then shutdown uncleanly
1103            let checkpoint = {
1104                let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
1105                    context.with_label("first"),
1106                    cfg.clone(),
1107                )
1108                .await
1109                .unwrap();
1110
1111                // Insert keys to trigger resize
1112                // key0 -> entry 0, key2 -> entry 1
1113                freezer.put(test_key("key0"), 0).await.unwrap();
1114                freezer.put(test_key("key2"), 1).await.unwrap();
1115                let checkpoint = freezer.sync().await.unwrap();
1116
1117                // Verify resize started
1118                assert!(freezer.resizing().is_some());
1119
1120                checkpoint
1121            };
1122
1123            // Reopen freezer
1124            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
1125                context.with_label("second"),
1126                cfg.clone(),
1127                Some(checkpoint),
1128            )
1129            .await
1130            .unwrap();
1131            assert_eq!(freezer.resizable(), 1);
1132
1133            // Verify resize starts immediately (1 key will have 0 added but 1
1134            // will still have 1)
1135            freezer.sync().await.unwrap();
1136            assert!(freezer.resizing().is_some());
1137
1138            // Run until resize completes
1139            while freezer.resizing().is_some() {
1140                freezer.sync().await.unwrap();
1141            }
1142
1143            // Ensure no entries are considered resizable
1144            assert_eq!(freezer.resizable(), 0);
1145        });
1146    }
1147
1148    fn test_operations_and_restart(num_keys: usize) -> String {
1149        let executor = deterministic::Runner::default();
1150        executor.start(|mut context| async move {
1151            let cfg = Config {
1152                key_partition: "test_key_index".into(),
1153                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1154                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1155                value_partition: "test_value_journal".into(),
1156                value_compression: None,
1157                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1158                value_target_size: 128, // Force multiple journal sections
1159                table_partition: "test_table".into(),
1160                table_initial_size: 8,     // Small table to force collisions
1161                table_resize_frequency: 2, // Force resize frequently
1162                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1163                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1164                codec_config: (),
1165            };
1166            let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init(
1167                context.with_label("init1"),
1168                cfg.clone(),
1169            )
1170            .await
1171            .expect("Failed to initialize freezer");
1172
1173            // Generate and insert random key-value pairs
1174            let mut pairs = Vec::new();
1175
1176            for _ in 0..num_keys {
1177                // Generate random key
1178                let mut key = [0u8; 96];
1179                context.fill_bytes(&mut key);
1180                let key = FixedBytes::<96>::new(key);
1181
1182                // Generate random value
1183                let mut value = [0u8; 256];
1184                context.fill_bytes(&mut value);
1185                let value = FixedBytes::<256>::new(value);
1186
1187                // Store the key-value pair
1188                freezer
1189                    .put(key.clone(), value.clone())
1190                    .await
1191                    .expect("Failed to put data");
1192                pairs.push((key, value));
1193
1194                // Randomly sync to test resizing
1195                if context.gen_bool(0.1) {
1196                    freezer.sync().await.expect("Failed to sync");
1197                }
1198            }
1199
1200            // Sync data
1201            freezer.sync().await.expect("Failed to sync");
1202
1203            // Verify all pairs can be retrieved
1204            for (key, value) in &pairs {
1205                let retrieved = freezer
1206                    .get(Identifier::Key(key))
1207                    .await
1208                    .expect("Failed to get data")
1209                    .expect("Data not found");
1210                assert_eq!(&retrieved, value);
1211            }
1212
1213            // Test get() on all keys
1214            for (key, _) in &pairs {
1215                assert!(freezer
1216                    .get(Identifier::Key(key))
1217                    .await
1218                    .expect("Failed to check key")
1219                    .is_some());
1220            }
1221
1222            // Check some non-existent keys
1223            for _ in 0..10 {
1224                let mut key = [0u8; 96];
1225                context.fill_bytes(&mut key);
1226                let key = FixedBytes::<96>::new(key);
1227                assert!(freezer
1228                    .get(Identifier::Key(&key))
1229                    .await
1230                    .expect("Failed to check key")
1231                    .is_none());
1232            }
1233
1234            // Close the freezer
1235            let checkpoint = freezer.close().await.expect("Failed to close freezer");
1236
1237            // Reopen the freezer
1238            let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init_with_checkpoint(
1239                context.with_label("init2"),
1240                cfg.clone(),
1241                Some(checkpoint),
1242            )
1243            .await
1244            .expect("Failed to initialize freezer");
1245
1246            // Verify all pairs are still there after restart
1247            for (key, value) in &pairs {
1248                let retrieved = freezer
1249                    .get(Identifier::Key(key))
1250                    .await
1251                    .expect("Failed to get data")
1252                    .expect("Data not found");
1253                assert_eq!(&retrieved, value);
1254            }
1255
1256            // Add more pairs after restart to test collision handling
1257            for _ in 0..20 {
1258                let mut key = [0u8; 96];
1259                context.fill_bytes(&mut key);
1260                let key = FixedBytes::<96>::new(key);
1261
1262                let mut value = [0u8; 256];
1263                context.fill_bytes(&mut value);
1264                let value = FixedBytes::<256>::new(value);
1265
1266                freezer.put(key, value).await.expect("Failed to put data");
1267            }
1268
1269            // Multiple syncs to test epoch progression
1270            for _ in 0..3 {
1271                freezer.sync().await.expect("Failed to sync");
1272
1273                // Add a few more entries between syncs
1274                for _ in 0..5 {
1275                    let mut key = [0u8; 96];
1276                    context.fill_bytes(&mut key);
1277                    let key = FixedBytes::<96>::new(key);
1278
1279                    let mut value = [0u8; 256];
1280                    context.fill_bytes(&mut value);
1281                    let value = FixedBytes::<256>::new(value);
1282
1283                    freezer.put(key, value).await.expect("Failed to put data");
1284                }
1285            }
1286
1287            // Final sync
1288            freezer.sync().await.expect("Failed to sync");
1289
1290            // Return the auditor state for comparison
1291            context.auditor().state()
1292        })
1293    }
1294
1295    #[test_group("slow")]
1296    #[test_traced]
1297    fn test_determinism() {
1298        let state1 = test_operations_and_restart(1_000);
1299        let state2 = test_operations_and_restart(1_000);
1300        assert_eq!(state1, state2);
1301    }
1302
1303    #[test_traced]
1304    fn test_put_multiple_updates() {
1305        // Initialize the deterministic context
1306        let executor = deterministic::Runner::default();
1307        executor.start(|context| async move {
1308            // Initialize the freezer
1309            let cfg = Config {
1310                key_partition: "test_key_index".into(),
1311                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1312                key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1313                value_partition: "test_value_journal".into(),
1314                value_compression: None,
1315                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1316                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1317                table_partition: "test_table".into(),
1318                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
1319                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
1320                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1321                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1322                codec_config: (),
1323            };
1324            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
1325                .await
1326                .expect("Failed to initialize freezer");
1327
1328            let key = test_key("key1");
1329
1330            freezer
1331                .put(key.clone(), 1)
1332                .await
1333                .expect("Failed to put data");
1334            freezer
1335                .put(key.clone(), 2)
1336                .await
1337                .expect("Failed to put data");
1338            freezer.sync().await.expect("Failed to sync");
1339            assert_eq!(
1340                freezer
1341                    .get(Identifier::Key(&key))
1342                    .await
1343                    .expect("Failed to get data")
1344                    .unwrap(),
1345                2
1346            );
1347        });
1348    }
1349}