Skip to main content

commonware_storage/freezer/
mod.rs

1//! An immutable key-value store optimized for minimal memory usage and write amplification.
2//!
3//! [Freezer] is a key-value store designed for permanent storage where data is written once and never
4//! modified. Meant for resource-constrained environments, [Freezer] exclusively employs disk-resident
5//! data structures to serve queries and avoids ever rewriting (i.e. compacting) inserted data.
6//!
7//! As a byproduct of the mechanisms used to satisfy these constraints, [Freezer] consistently provides
8//! low latency access to recently added data (regardless of how much data has been stored) at the expense
9//! of a logarithmic increase in latency for old data (increasing with the number of items stored).
10//!
11//! # Format
12//!
13//! The [Freezer] uses a three-level architecture:
14//! 1. An extendible hash table (written in a single [commonware_runtime::Blob]) that maps keys to locations
15//! 2. A key index journal ([crate::journal::segmented::fixed]) that stores keys and collision chain pointers
16//! 3. A value journal ([crate::journal::segmented::glob]) that stores the actual values
17//!
18//! These journals are combined via [crate::journal::segmented::oversized], which coordinates
19//! crash recovery between them.
20//!
21//! ```text
22//! +-----------------------------------------------------------------+
23//! |                           Hash Table                            |
24//! |  +---------+---------+---------+---------+---------+---------+  |
25//! |  | Entry 0 | Entry 1 | Entry 2 | Entry 3 | Entry 4 |   ...   |  |
26//! |  +----+----+----+----+----+----+----+----+----+----+---------+  |
27//! +-------|---------|---------|---------|---------|---------|-------+
28//!         |         |         |         |         |         |
29//!         v         v         v         v         v         v
30//! +-----------------------------------------------------------------+
31//! |                      Key Index Journal                          |
32//! |  Section 0: [Entry 0][Entry 1][Entry 2]...                      |
33//! |  Section 1: [Entry 10][Entry 11][Entry 12]...                   |
34//! |  Section N: [Entry 100][Entry 101][Entry 102]...                |
35//! +-------|---------|---------|---------|---------|---------|-------+
36//!         |         |         |         |         |         |
37//!         v         v         v         v         v         v
38//! +-----------------------------------------------------------------+
39//! |                        Value Journal                            |
40//! |  Section 0: [Value 0][Value 1][Value 2]...                      |
41//! |  Section 1: [Value 10][Value 11][Value 12]...                   |
42//! |  Section N: [Value 100][Value 101][Value 102]...                |
43//! +-----------------------------------------------------------------+
44//! ```
45//!
46//! The table uses two fixed-size slots per entry to ensure consistency during updates. Each slot
47//! contains an epoch number that monotonically increases with each sync operation. During reads,
48//! the slot with the higher epoch is selected (provided it's not greater than the last committed
49//! epoch), ensuring consistency even if the system crashed during a write.
50//!
51//! ```text
52//! +-------------------------------------+
53//! |          Hash Table Entry           |
54//! +-------------------------------------+
55//! |     Slot 0      |      Slot 1       |
56//! +-----------------+-------------------+
57//! | epoch:    u64   | epoch:    u64     |
58//! | section:  u64   | section:  u64     |
59//! | offset:   u32   | offset:   u32     |
60//! | added:    u8    | added:    u8      |
61//! +-----------------+-------------------+
62//! | CRC32:    u32   | CRC32:    u32     |
63//! +-----------------+-------------------+
64//! ```
65//!
66//! The key index journal stores fixed-size entries containing a key, a pointer to the value in the
67//! value journal, and an optional pointer to the next entry in the collision chain (for keys that
68//! hash to the same table index).
69//!
70//! ```text
71//! +-------------------------------------+
72//! |        Key Index Entry              |
73//! +-------------------------------------+
74//! | Key:           Array                |
75//! | Value Offset:  u64                  |
76//! | Value Size:    u32                  |
77//! | Next:          Option<(u64, u32)>   |
78//! +-------------------------------------+
79//! ```
80//!
81//! The value journal stores the actual encoded values at the offsets referenced by the key index entries.
82//!
83//! # Traversing Conflicts
84//!
85//! When multiple keys hash to the same table index, they form a linked list within the key index
86//! journal. Each key index entry points to its value in the value journal:
87//!
88//! ```text
89//! Hash Table:
90//! [Index 42]         +-------------------+
91//!                    | section: 2        |
92//!                    | offset: 768       |
93//!                    +---------+---------+
94//!                              |
95//! Key Index Journal:           v
96//! [Section 2]        +-----------------------+
97//!                    | Key: "foo"            |
98//!                    | ValOff: 100           |
99//!                    | ValSize: 20           |
100//!                    | Next: (1, 512) -------+---+
101//!                    +-----------------------+   |
102//!                                                v
103//! [Section 1]        +-----------------------+
104//!                    | Key: "bar"            |
105//!                    | ValOff: 50            |
106//!                    | ValSize: 20           |
107//!                    | Next: (0, 256) -------+---+
108//!                    +-----------------------+   |
109//!                                                v
110//! [Section 0]        +-----------------------+
111//!                    | Key: "baz"            |
112//!                    | ValOff: 0             |
113//!                    | ValSize: 20           |
114//!                    | Next: None            |
115//!                    +-----------------------+
116//!
117//! Value Journal:
118//! [Section 0]        [Value: 126 @ offset 0 ]
119//! [Section 1]        [Value: 84  @ offset 50]
120//! [Section 2]        [Value: 42  @ offset 100]
121//! ```
122//!
123//! New entries are prepended to the chain, becoming the new head. During lookup, the chain
124//! is traversed until a matching key is found. The `added` field in the table entry tracks
125//! insertions since the last resize, triggering table growth when 50% of entries have had
126//! `table_resize_frequency` items added (since the last resize).
127//!
128//! # Extendible Hashing
129//!
130//! The [Freezer] uses bit-based indexing to grow the on-disk hash table without rehashing existing entries:
131//!
132//! ```text
133//! Initial state (table_size=4, using 2 bits of hash):
134//! Hash: 0b...00 -> Index 0
135//! Hash: 0b...01 -> Index 1
136//! Hash: 0b...10 -> Index 2
137//! Hash: 0b...11 -> Index 3
138//!
139//! After resize (table_size=8, using 3 bits of hash):
140//! Hash: 0b...000 -> Index 0 -+
141//! ...                        |
142//! Hash: 0b...100 -> Index 4 -+- Both map to old Index 0
143//! Hash: 0b...001 -> Index 1 -+
144//! ...                        |
145//! Hash: 0b...101 -> Index 5 -+- Both map to old Index 1
146//! ```
147//!
148//! When the table doubles in size:
149//! 1. Each entry at index `i` splits into two entries: `i` and `i + old_size`
150//! 2. The existing chain head is copied to both locations with `added=0`
151//! 3. Future insertions will naturally distribute between the two entries based on their hash
152//!
153//! This approach ensures that entries inserted before a resize remain discoverable after the resize,
154//! as the lookup algorithm checks the appropriate entry based on the current table size. As more and more
155//! items are added (and resizes occur), the latency for fetching old data will increase logarithmically
156//! (with the number of items stored).
157//!
158//! To prevent a "stall" during a single resize, the table is resized incrementally across multiple sync calls.
159//! Each sync will process up to `table_resize_chunk_size` entries until the resize is complete. If there is
160//! an ongoing resize when closing the [Freezer], the resize will be completed before closing.
161//!
162//! # Example
163//!
164//! ```rust
165//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
166//! use commonware_storage::freezer::{Freezer, Config, Identifier};
167//! use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16};
168//!
169//! let executor = deterministic::Runner::default();
170//! executor.start(|context| async move {
171//!     // Create a freezer
172//!     let cfg = Config {
173//!         key_partition: "freezer-key-index".into(),
174//!         key_write_buffer: NZUsize!(1024 * 1024), // 1MB
175//!         key_page_cache: CacheRef::from_pooler(&context, NZU16!(1024), NZUsize!(10)),
176//!         value_partition: "freezer-value-journal".into(),
177//!         value_compression: Some(3),
178//!         value_write_buffer: NZUsize!(1024 * 1024), // 1MB
179//!         value_target_size: 100 * 1024 * 1024, // 100MB
180//!         table_partition: "freezer-table".into(),
181//!         table_initial_size: 65_536, // ~3MB initial table size
182//!         table_resize_frequency: 4, // Force resize once 4 writes to the same entry occur
183//!         table_resize_chunk_size: 16_384, // ~1MB of table entries rewritten per sync
184//!         table_replay_buffer: NZUsize!(1024 * 1024), // 1MB
185//!         codec_config: (),
186//!     };
187//!     let mut freezer = Freezer::<_, FixedBytes<32>, i32>::init(context, cfg).await.unwrap();
188//!
189//!     // Put a key-value pair
190//!     let key = FixedBytes::new([1u8; 32]);
191//!     freezer.put(key.clone(), 42).await.unwrap();
192//!
193//!     // Sync to disk
194//!     freezer.sync().await.unwrap();
195//!
196//!     // Get the value
197//!     let value = freezer.get(Identifier::Key(&key)).await.unwrap().unwrap();
198//!     assert_eq!(value, 42);
199//!
200//!     // Close the freezer
201//!     freezer.close().await.unwrap();
202//! });
203//! ```
204
205#[cfg(test)]
206mod conformance;
207mod storage;
208use commonware_runtime::buffer::paged::CacheRef;
209use commonware_utils::Array;
210use std::num::NonZeroUsize;
211pub use storage::{Checkpoint, Cursor, Freezer};
212use thiserror::Error;
213
214/// Subject of a [Freezer::get] operation.
215pub enum Identifier<'a, K: Array> {
216    Cursor(Cursor),
217    Key(&'a K),
218}
219
220/// Errors that can occur when interacting with the [Freezer].
221#[derive(Debug, Error)]
222pub enum Error {
223    #[error("runtime error: {0}")]
224    Runtime(#[from] commonware_runtime::Error),
225    #[error("journal error: {0}")]
226    Journal(#[from] crate::journal::Error),
227    #[error("codec error: {0}")]
228    Codec(#[from] commonware_codec::Error),
229}
230
231/// Configuration for [Freezer].
232#[derive(Clone)]
233pub struct Config<C> {
234    /// The [commonware_runtime::Storage] partition for the key index journal.
235    pub key_partition: String,
236
237    /// The size of the write buffer for the key index journal.
238    pub key_write_buffer: NonZeroUsize,
239
240    /// The page cache for the key index journal.
241    pub key_page_cache: CacheRef,
242
243    /// The [commonware_runtime::Storage] partition for the value journal.
244    pub value_partition: String,
245
246    /// The compression level for the value journal.
247    pub value_compression: Option<u8>,
248
249    /// The size of the write buffer for the value journal.
250    pub value_write_buffer: NonZeroUsize,
251
252    /// The target size of each value journal section before creating a new one.
253    pub value_target_size: u64,
254
255    /// The [commonware_runtime::Storage] partition to use for storing the table.
256    pub table_partition: String,
257
258    /// The initial number of items in the table.
259    pub table_initial_size: u32,
260
261    /// The number of items that must be added to 50% of table entries since the last resize before
262    /// the table is resized again.
263    pub table_resize_frequency: u8,
264
265    /// The number of items to move during each resize operation (many may be required to complete a resize).
266    pub table_resize_chunk_size: u32,
267
268    /// The size of the read buffer to use when scanning the table (e.g., during recovery or resize).
269    pub table_replay_buffer: NonZeroUsize,
270
271    /// The codec configuration to use for the value stored in the freezer.
272    pub codec_config: C,
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use commonware_codec::DecodeExt;
279    use commonware_formatting::hex;
280    use commonware_macros::{test_group, test_traced};
281    use commonware_runtime::{deterministic, Blob, Metrics as _, Runner, Storage, Supervisor as _};
282    use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16};
283    use rand::{Rng, RngCore};
284    use std::num::NonZeroU16;
285
286    fn test_key(key: &str) -> FixedBytes<64> {
287        let mut buf = [0u8; 64];
288        let key = key.as_bytes();
289        assert!(key.len() <= buf.len());
290        buf[..key.len()].copy_from_slice(key);
291        FixedBytes::decode(buf.as_ref()).unwrap()
292    }
293
294    const DEFAULT_WRITE_BUFFER: usize = 1024;
295    const DEFAULT_VALUE_TARGET_SIZE: u64 = 10 * 1024 * 1024;
296    const DEFAULT_TABLE_INITIAL_SIZE: u32 = 256;
297    const DEFAULT_TABLE_RESIZE_FREQUENCY: u8 = 4;
298    const DEFAULT_TABLE_RESIZE_CHUNK_SIZE: u32 = 128; // force multiple chunks
299    const DEFAULT_TABLE_REPLAY_BUFFER: usize = 64 * 1024; // 64KB
300    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
301    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
302
303    fn test_put_get(compression: Option<u8>) {
304        // Initialize the deterministic context
305        let executor = deterministic::Runner::default();
306        executor.start(|context| async move {
307            // Initialize the freezer
308            let cfg = Config {
309                key_partition: "test-key-index".into(),
310                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
311                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
312                value_partition: "test-value-journal".into(),
313                value_compression: compression,
314                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
315                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
316                table_partition: "test-table".into(),
317                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
318                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
319                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
320                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
321                codec_config: (),
322            };
323            let mut freezer =
324                Freezer::<_, FixedBytes<64>, i32>::init(context.child("storage"), cfg.clone())
325                    .await
326                    .expect("Failed to initialize freezer");
327
328            let key = test_key("testkey");
329            let data = 42;
330
331            // Check key doesn't exist
332            let value = freezer
333                .get(Identifier::Key(&key))
334                .await
335                .expect("Failed to check key");
336            assert!(value.is_none());
337
338            // Put the key-data pair
339            freezer
340                .put(key.clone(), data)
341                .await
342                .expect("Failed to put data");
343
344            // Get the data back
345            let value = freezer
346                .get(Identifier::Key(&key))
347                .await
348                .expect("Failed to get data")
349                .expect("Data not found");
350            assert_eq!(value, data);
351
352            // Check metrics
353            let buffer = context.encode();
354            assert!(buffer.contains("gets_total 2"), "{}", buffer);
355            assert!(buffer.contains("puts_total 1"), "{}", buffer);
356            assert!(buffer.contains("unnecessary_reads_total 0"), "{}", buffer);
357
358            // Force a sync
359            freezer.sync().await.expect("Failed to sync data");
360        });
361    }
362
363    #[test_traced]
364    fn test_put_get_no_compression() {
365        test_put_get(None);
366    }
367
368    #[test_traced]
369    fn test_put_get_compression() {
370        test_put_get(Some(3));
371    }
372
373    #[test_traced]
374    fn test_multiple_keys() {
375        // Initialize the deterministic context
376        let executor = deterministic::Runner::default();
377        executor.start(|context| async move {
378            // Initialize the freezer
379            let cfg = Config {
380                key_partition: "test-key-index".into(),
381                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
382                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
383                value_partition: "test-value-journal".into(),
384                value_compression: None,
385                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
386                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
387                table_partition: "test-table".into(),
388                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
389                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
390                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
391                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
392                codec_config: (),
393            };
394            let mut freezer =
395                Freezer::<_, FixedBytes<64>, i32>::init(context.child("storage"), cfg.clone())
396                    .await
397                    .expect("Failed to initialize freezer");
398
399            // Insert multiple keys
400            let keys = vec![
401                (test_key("key1"), 1),
402                (test_key("key2"), 2),
403                (test_key("key3"), 3),
404                (test_key("key4"), 4),
405                (test_key("key5"), 5),
406            ];
407
408            for (key, data) in &keys {
409                freezer
410                    .put(key.clone(), *data)
411                    .await
412                    .expect("Failed to put data");
413            }
414
415            // Retrieve all keys and verify
416            for (key, data) in &keys {
417                let retrieved = freezer
418                    .get(Identifier::Key(key))
419                    .await
420                    .expect("Failed to get data")
421                    .expect("Data not found");
422                assert_eq!(retrieved, *data);
423            }
424        });
425    }
426
427    #[test_traced]
428    fn test_collision_handling() {
429        // Initialize the deterministic context
430        let executor = deterministic::Runner::default();
431        executor.start(|context| async move {
432            // Initialize the freezer with a very small table to force collisions
433            let cfg = Config {
434                key_partition: "test-key-index".into(),
435                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
436                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
437                value_partition: "test-value-journal".into(),
438                value_compression: None,
439                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
440                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
441                table_partition: "test-table".into(),
442                table_initial_size: 4, // Very small to force collisions
443                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
444                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
445                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
446                codec_config: (),
447            };
448            let mut freezer =
449                Freezer::<_, FixedBytes<64>, i32>::init(context.child("storage"), cfg.clone())
450                    .await
451                    .expect("Failed to initialize freezer");
452
453            // Insert multiple keys that will likely collide
454            let keys = vec![
455                (test_key("key1"), 1),
456                (test_key("key2"), 2),
457                (test_key("key3"), 3),
458                (test_key("key4"), 4),
459                (test_key("key5"), 5),
460                (test_key("key6"), 6),
461                (test_key("key7"), 7),
462                (test_key("key8"), 8),
463            ];
464
465            for (key, data) in &keys {
466                freezer
467                    .put(key.clone(), *data)
468                    .await
469                    .expect("Failed to put data");
470            }
471
472            // Sync to disk
473            freezer.sync().await.expect("Failed to sync");
474
475            // Retrieve all keys and verify they can still be found
476            for (key, data) in &keys {
477                let retrieved = freezer
478                    .get(Identifier::Key(key))
479                    .await
480                    .expect("Failed to get data")
481                    .expect("Data not found");
482                assert_eq!(retrieved, *data);
483            }
484
485            // Check metrics
486            let buffer = context.encode();
487            assert!(buffer.contains("gets_total 8"), "{}", buffer);
488            assert!(buffer.contains("unnecessary_reads_total 5"), "{}", buffer);
489        });
490    }
491
492    #[test_traced]
493    fn test_restart() {
494        // Initialize the deterministic context
495        let executor = deterministic::Runner::default();
496        executor.start(|context| async move {
497            let cfg = Config {
498                key_partition: "test-key-index".into(),
499                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
500                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
501                value_partition: "test-value-journal".into(),
502                value_compression: None,
503                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
504                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
505                table_partition: "test-table".into(),
506                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
507                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
508                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
509                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
510                codec_config: (),
511            };
512
513            // Insert data and close the freezer
514            let checkpoint = {
515                let mut freezer =
516                    Freezer::<_, FixedBytes<64>, i32>::init(context.child("first"), cfg.clone())
517                        .await
518                        .expect("Failed to initialize freezer");
519
520                let keys = vec![
521                    (test_key("persist1"), 100),
522                    (test_key("persist2"), 200),
523                    (test_key("persist3"), 300),
524                ];
525
526                for (key, data) in &keys {
527                    freezer
528                        .put(key.clone(), *data)
529                        .await
530                        .expect("Failed to put data");
531                }
532
533                freezer.close().await.expect("Failed to close freezer")
534            };
535
536            // Reopen and verify data persisted
537            {
538                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
539                    context.child("second"),
540                    cfg.clone(),
541                    Some(checkpoint),
542                )
543                .await
544                .expect("Failed to initialize freezer");
545
546                let keys = vec![
547                    (test_key("persist1"), 100),
548                    (test_key("persist2"), 200),
549                    (test_key("persist3"), 300),
550                ];
551
552                for (key, data) in &keys {
553                    let retrieved = freezer
554                        .get(Identifier::Key(key))
555                        .await
556                        .expect("Failed to get data")
557                        .expect("Data not found");
558                    assert_eq!(retrieved, *data);
559                }
560            }
561        });
562    }
563
564    #[test_traced]
565    fn test_crash_consistency() {
566        // Initialize the deterministic context
567        let executor = deterministic::Runner::default();
568        executor.start(|context| async move {
569            let cfg = Config {
570                key_partition: "test-key-index".into(),
571                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
572                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
573                value_partition: "test-value-journal".into(),
574                value_compression: None,
575                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
576                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
577                table_partition: "test-table".into(),
578                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
579                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
580                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
581                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
582                codec_config: (),
583            };
584
585            // First, create some committed data and close the freezer
586            let checkpoint = {
587                let mut freezer =
588                    Freezer::<_, FixedBytes<64>, i32>::init(context.child("first"), cfg.clone())
589                        .await
590                        .expect("Failed to initialize freezer");
591
592                freezer
593                    .put(test_key("committed1"), 1)
594                    .await
595                    .expect("Failed to put data");
596                freezer
597                    .put(test_key("committed2"), 2)
598                    .await
599                    .expect("Failed to put data");
600
601                // Sync to ensure data is committed
602                freezer.sync().await.expect("Failed to sync");
603
604                // Add more data but don't sync (simulating crash)
605                freezer
606                    .put(test_key("uncommitted1"), 3)
607                    .await
608                    .expect("Failed to put data");
609                freezer
610                    .put(test_key("uncommitted2"), 4)
611                    .await
612                    .expect("Failed to put data");
613
614                // Close without syncing to simulate crash
615                freezer.close().await.expect("Failed to close")
616            };
617
618            // Reopen and verify only committed data is present
619            {
620                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
621                    context.child("second"),
622                    cfg.clone(),
623                    Some(checkpoint),
624                )
625                .await
626                .expect("Failed to initialize freezer");
627
628                // Committed data should be present
629                assert_eq!(
630                    freezer
631                        .get(Identifier::Key(&test_key("committed1")))
632                        .await
633                        .unwrap(),
634                    Some(1)
635                );
636                assert_eq!(
637                    freezer
638                        .get(Identifier::Key(&test_key("committed2")))
639                        .await
640                        .unwrap(),
641                    Some(2)
642                );
643
644                // Uncommitted data might or might not be present depending on implementation
645                // But if present, it should be correct
646                if let Some(val) = freezer
647                    .get(Identifier::Key(&test_key("uncommitted1")))
648                    .await
649                    .unwrap()
650                {
651                    assert_eq!(val, 3);
652                }
653                if let Some(val) = freezer
654                    .get(Identifier::Key(&test_key("uncommitted2")))
655                    .await
656                    .unwrap()
657                {
658                    assert_eq!(val, 4);
659                }
660            }
661        });
662    }
663
664    #[test_traced]
665    fn test_destroy() {
666        // Initialize the deterministic context
667        let executor = deterministic::Runner::default();
668        executor.start(|context| async move {
669            // Initialize the freezer
670            let cfg = Config {
671                key_partition: "test-key-index".into(),
672                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
673                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
674                value_partition: "test-value-journal".into(),
675                value_compression: None,
676                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
677                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
678                table_partition: "test-table".into(),
679                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
680                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
681                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
682                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
683                codec_config: (),
684            };
685            {
686                let mut freezer =
687                    Freezer::<_, FixedBytes<64>, i32>::init(context.child("first"), cfg.clone())
688                        .await
689                        .expect("Failed to initialize freezer");
690
691                freezer
692                    .put(test_key("destroy1"), 1)
693                    .await
694                    .expect("Failed to put data");
695                freezer
696                    .put(test_key("destroy2"), 2)
697                    .await
698                    .expect("Failed to put data");
699
700                // Destroy the freezer
701                freezer.destroy().await.expect("Failed to destroy freezer");
702            }
703
704            // Try to create a new freezer - it should be empty
705            {
706                let freezer =
707                    Freezer::<_, FixedBytes<64>, i32>::init(context.child("second"), cfg.clone())
708                        .await
709                        .expect("Failed to initialize freezer");
710
711                // Should not find any data
712                assert!(freezer
713                    .get(Identifier::Key(&test_key("destroy1")))
714                    .await
715                    .unwrap()
716                    .is_none());
717                assert!(freezer
718                    .get(Identifier::Key(&test_key("destroy2")))
719                    .await
720                    .unwrap()
721                    .is_none());
722            }
723        });
724    }
725
726    #[test_traced]
727    fn test_partial_table_entry_write() {
728        // Initialize the deterministic context
729        let executor = deterministic::Runner::default();
730        executor.start(|context| async move {
731            // Initialize the freezer
732            let cfg = Config {
733                key_partition: "test-key-index".into(),
734                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
735                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
736                value_partition: "test-value-journal".into(),
737                value_compression: None,
738                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
739                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
740                table_partition: "test-table".into(),
741                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
742                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
743                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
744                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
745                codec_config: (),
746            };
747            let checkpoint = {
748                let mut freezer =
749                    Freezer::<_, FixedBytes<64>, i32>::init(context.child("first"), cfg.clone())
750                        .await
751                        .expect("Failed to initialize freezer");
752
753                freezer.put(test_key("key1"), 42).await.unwrap();
754                freezer.sync().await.unwrap();
755                freezer.close().await.unwrap()
756            };
757
758            // Corrupt the table by writing partial entry
759            {
760                let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
761                // Write incomplete table entry (only 10 bytes instead of 24)
762                blob.write_at_sync(0, vec![0xFF; 10]).await.unwrap();
763            }
764
765            // Reopen and verify it handles the corruption
766            {
767                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
768                    context.child("second"),
769                    cfg.clone(),
770                    Some(checkpoint),
771                )
772                .await
773                .expect("Failed to initialize freezer");
774
775                // The key should still be retrievable from journal if table is corrupted
776                // but the table entry is zeroed out
777                let result = freezer
778                    .get(Identifier::Key(&test_key("key1")))
779                    .await
780                    .unwrap();
781                assert!(result.is_none() || result == Some(42));
782            }
783        });
784    }
785
786    #[test_traced]
787    fn test_table_entry_invalid_crc() {
788        // Initialize the deterministic context
789        let executor = deterministic::Runner::default();
790        executor.start(|context| async move {
791            let cfg = Config {
792                key_partition: "test-key-index".into(),
793                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
794                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
795                value_partition: "test-value-journal".into(),
796                value_compression: None,
797                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
798                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
799                table_partition: "test-table".into(),
800                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
801                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
802                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
803                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
804                codec_config: (),
805            };
806
807            // Create freezer with data
808            let checkpoint = {
809                let mut freezer =
810                    Freezer::<_, FixedBytes<64>, i32>::init(context.child("first"), cfg.clone())
811                        .await
812                        .expect("Failed to initialize freezer");
813
814                freezer.put(test_key("key1"), 42).await.unwrap();
815                freezer.sync().await.unwrap();
816                freezer.close().await.unwrap()
817            };
818
819            // Corrupt the CRC in the index entry
820            {
821                let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
822                // Read the first entry
823                let entry_data = blob.read_at(0, 24).await.unwrap();
824                let mut corrupted = entry_data.coalesce();
825                // Corrupt the CRC (last 4 bytes of the entry)
826                corrupted.as_mut()[20] ^= 0xFF;
827                blob.write_at_sync(0, corrupted).await.unwrap();
828            }
829
830            // Reopen and verify it handles invalid CRC
831            {
832                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
833                    context.child("second"),
834                    cfg.clone(),
835                    Some(checkpoint),
836                )
837                .await
838                .expect("Failed to initialize freezer");
839
840                // With invalid CRC, the entry should be treated as invalid
841                let result = freezer
842                    .get(Identifier::Key(&test_key("key1")))
843                    .await
844                    .unwrap();
845                // The freezer should still work but may not find the key due to invalid table entry
846                assert!(result.is_none() || result == Some(42));
847            }
848        });
849    }
850
851    #[test_traced]
852    fn test_table_extra_bytes() {
853        // Initialize the deterministic context
854        let executor = deterministic::Runner::default();
855        executor.start(|context| async move {
856            let cfg = Config {
857                key_partition: "test-key-index".into(),
858                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
859                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
860                value_partition: "test-value-journal".into(),
861                value_compression: None,
862                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
863                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
864                table_partition: "test-table".into(),
865                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
866                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
867                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
868                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
869                codec_config: (),
870            };
871
872            // Create freezer with data
873            let checkpoint = {
874                let mut freezer =
875                    Freezer::<_, FixedBytes<64>, i32>::init(context.child("first"), cfg.clone())
876                        .await
877                        .expect("Failed to initialize freezer");
878
879                freezer.put(test_key("key1"), 42).await.unwrap();
880                freezer.sync().await.unwrap();
881                freezer.close().await.unwrap()
882            };
883
884            // Add extra bytes to the table blob
885            {
886                let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
887                // Append garbage data
888                blob.write_at_sync(size, hex!("0xdeadbeef").to_vec())
889                    .await
890                    .unwrap();
891            }
892
893            // Reopen and verify it handles extra bytes gracefully
894            {
895                let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
896                    context.child("second"),
897                    cfg.clone(),
898                    Some(checkpoint),
899                )
900                .await
901                .expect("Failed to initialize freezer");
902
903                // Should still be able to read the key
904                assert_eq!(
905                    freezer
906                        .get(Identifier::Key(&test_key("key1")))
907                        .await
908                        .unwrap(),
909                    Some(42)
910                );
911
912                // And write new data
913                let mut freezer_mut = freezer;
914                freezer_mut.put(test_key("key2"), 43).await.unwrap();
915                assert_eq!(
916                    freezer_mut
917                        .get(Identifier::Key(&test_key("key2")))
918                        .await
919                        .unwrap(),
920                    Some(43)
921                );
922            }
923        });
924    }
925
926    #[test_traced]
927    fn test_indexing_across_resizes() {
928        // Initialize the deterministic context
929        let executor = deterministic::Runner::default();
930        executor.start(|context| async move {
931            // Initialize the freezer
932            let cfg = Config {
933                key_partition: "test-key-index".into(),
934                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
935                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
936                value_partition: "test-value-journal".into(),
937                value_compression: None,
938                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
939                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
940                table_partition: "test-table".into(),
941                table_initial_size: 2, // Very small initial size to force multiple resizes
942                table_resize_frequency: 2, // Resize after 2 items per entry
943                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
944                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
945                codec_config: (),
946            };
947            let mut freezer =
948                Freezer::<_, FixedBytes<64>, i32>::init(context.child("first"), cfg.clone())
949                    .await
950                    .expect("Failed to initialize freezer");
951
952            // Insert many keys to force multiple table resizes
953            // Table will grow from 2 -> 4 -> 8 -> 16 -> 32 -> 64 -> 128 -> 256 -> 512 -> 1024
954            let mut keys = Vec::new();
955            for i in 0..1000 {
956                let key = test_key(&format!("key{i}"));
957                keys.push((key.clone(), i));
958
959                // Force sync to ensure resize occurs ASAP
960                freezer.put(key, i).await.expect("Failed to put data");
961                freezer.sync().await.expect("Failed to sync");
962            }
963
964            // Verify all keys can still be found after multiple resizes
965            for (key, value) in &keys {
966                let retrieved = freezer
967                    .get(Identifier::Key(key))
968                    .await
969                    .expect("Failed to get data")
970                    .expect("Data not found");
971                assert_eq!(retrieved, *value, "Value mismatch for key after resizes");
972            }
973
974            // Verify metrics show resize operations occurred. Must be checked
975            // before closing: dropping the freezer drops its Registered metric
976            // handles, which unregisters the metrics.
977            let buffer = context.encode();
978            assert!(buffer.contains("first_resizes_total 8"), "{}", buffer);
979
980            // Close and reopen to verify persistence
981            let checkpoint = freezer.close().await.expect("Failed to close");
982            let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
983                context.child("second"),
984                cfg.clone(),
985                Some(checkpoint),
986            )
987            .await
988            .expect("Failed to reinitialize freezer");
989
990            // Verify all keys can still be found after restart
991            for (key, value) in &keys {
992                let retrieved = freezer
993                    .get(Identifier::Key(key))
994                    .await
995                    .expect("Failed to get data")
996                    .expect("Data not found");
997                assert_eq!(retrieved, *value, "Value mismatch for key after restart");
998            }
999        });
1000    }
1001
1002    #[test_traced]
1003    fn test_insert_during_resize() {
1004        let executor = deterministic::Runner::default();
1005        executor.start(|context| async move {
1006            let cfg = Config {
1007                key_partition: "test-key-index".into(),
1008                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1009                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1010                value_partition: "test-value-journal".into(),
1011                value_compression: None,
1012                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1013                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1014                table_partition: "test-table".into(),
1015                table_initial_size: 2,
1016                table_resize_frequency: 1,
1017                table_resize_chunk_size: 1, // Process one at a time
1018                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1019                codec_config: (),
1020            };
1021            let mut freezer =
1022                Freezer::<_, FixedBytes<64>, i32>::init(context.child("storage"), cfg.clone())
1023                    .await
1024                    .unwrap();
1025
1026            // Insert keys to trigger resize
1027            // key0 -> entry 0, key2 -> entry 1
1028            freezer.put(test_key("key0"), 0).await.unwrap();
1029            freezer.put(test_key("key2"), 1).await.unwrap();
1030            freezer.sync().await.unwrap(); // should start resize
1031
1032            // Verify resize started
1033            assert!(freezer.resizing().is_some());
1034
1035            // Insert during resize (to first entry)
1036            // key6 -> entry 0
1037            freezer.put(test_key("key6"), 2).await.unwrap();
1038            assert!(context.encode().contains("unnecessary_writes_total 1"));
1039            assert_eq!(freezer.resizable(), 3);
1040
1041            // Insert another key (to unmodified entry)
1042            // key3 -> entry 1
1043            freezer.put(test_key("key3"), 3).await.unwrap();
1044            assert!(context.encode().contains("unnecessary_writes_total 1"));
1045            assert_eq!(freezer.resizable(), 3);
1046
1047            // Verify resize completed
1048            freezer.sync().await.unwrap();
1049            assert!(freezer.resizing().is_none());
1050            assert_eq!(freezer.resizable(), 2);
1051
1052            // More inserts
1053            // key4 -> entry 1, key7 -> entry 0
1054            freezer.put(test_key("key4"), 4).await.unwrap();
1055            freezer.put(test_key("key7"), 5).await.unwrap();
1056            freezer.sync().await.unwrap();
1057
1058            // Another resize should've started
1059            assert!(freezer.resizing().is_some());
1060
1061            // Verify all can be retrieved during resize
1062            let keys = ["key0", "key2", "key6", "key3", "key4", "key7"];
1063            for (i, k) in keys.iter().enumerate() {
1064                assert_eq!(
1065                    freezer.get(Identifier::Key(&test_key(k))).await.unwrap(),
1066                    Some(i as i32)
1067                );
1068            }
1069
1070            // Sync until resize completes
1071            while freezer.resizing().is_some() {
1072                freezer.sync().await.unwrap();
1073            }
1074
1075            // Ensure no entries are considered resizable
1076            assert_eq!(freezer.resizable(), 0);
1077        });
1078    }
1079
1080    #[test_traced]
1081    fn test_resize_after_startup() {
1082        let executor = deterministic::Runner::default();
1083        executor.start(|context| async move {
1084            let cfg = Config {
1085                key_partition: "test-key-index".into(),
1086                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1087                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1088                value_partition: "test-value-journal".into(),
1089                value_compression: None,
1090                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1091                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1092                table_partition: "test-table".into(),
1093                table_initial_size: 2,
1094                table_resize_frequency: 1,
1095                table_resize_chunk_size: 1, // Process one at a time
1096                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1097                codec_config: (),
1098            };
1099
1100            // Create freezer and then shutdown uncleanly
1101            let checkpoint = {
1102                let mut freezer =
1103                    Freezer::<_, FixedBytes<64>, i32>::init(context.child("first"), cfg.clone())
1104                        .await
1105                        .unwrap();
1106
1107                // Insert keys to trigger resize
1108                // key0 -> entry 0, key2 -> entry 1
1109                freezer.put(test_key("key0"), 0).await.unwrap();
1110                freezer.put(test_key("key2"), 1).await.unwrap();
1111                let checkpoint = freezer.sync().await.unwrap();
1112
1113                // Verify resize started
1114                assert!(freezer.resizing().is_some());
1115
1116                checkpoint
1117            };
1118
1119            // Reopen freezer
1120            let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
1121                context.child("second"),
1122                cfg.clone(),
1123                Some(checkpoint),
1124            )
1125            .await
1126            .unwrap();
1127            assert_eq!(freezer.resizable(), 1);
1128
1129            // Verify resize starts immediately (1 key will have 0 added but 1
1130            // will still have 1)
1131            freezer.sync().await.unwrap();
1132            assert!(freezer.resizing().is_some());
1133
1134            // Run until resize completes
1135            while freezer.resizing().is_some() {
1136                freezer.sync().await.unwrap();
1137            }
1138
1139            // Ensure no entries are considered resizable
1140            assert_eq!(freezer.resizable(), 0);
1141        });
1142    }
1143
1144    fn test_operations_and_restart(num_keys: usize) -> String {
1145        let executor = deterministic::Runner::default();
1146        executor.start(|mut context| async move {
1147            let cfg = Config {
1148                key_partition: "test-key-index".into(),
1149                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1150                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1151                value_partition: "test-value-journal".into(),
1152                value_compression: None,
1153                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1154                value_target_size: 128, // Force multiple journal sections
1155                table_partition: "test-table".into(),
1156                table_initial_size: 8,     // Small table to force collisions
1157                table_resize_frequency: 2, // Force resize frequently
1158                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1159                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1160                codec_config: (),
1161            };
1162            let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init(
1163                context.child("init").with_attribute("index", 1),
1164                cfg.clone(),
1165            )
1166            .await
1167            .expect("Failed to initialize freezer");
1168
1169            // Generate and insert random key-value pairs
1170            let mut pairs = Vec::new();
1171
1172            for _ in 0..num_keys {
1173                // Generate random key
1174                let mut key = [0u8; 96];
1175                context.fill_bytes(&mut key);
1176                let key = FixedBytes::<96>::new(key);
1177
1178                // Generate random value
1179                let mut value = [0u8; 256];
1180                context.fill_bytes(&mut value);
1181                let value = FixedBytes::<256>::new(value);
1182
1183                // Store the key-value pair
1184                freezer
1185                    .put(key.clone(), value.clone())
1186                    .await
1187                    .expect("Failed to put data");
1188                pairs.push((key, value));
1189
1190                // Randomly sync to test resizing
1191                if context.gen_bool(0.1) {
1192                    freezer.sync().await.expect("Failed to sync");
1193                }
1194            }
1195
1196            // Sync data
1197            freezer.sync().await.expect("Failed to sync");
1198
1199            // Verify all pairs can be retrieved
1200            for (key, value) in &pairs {
1201                let retrieved = freezer
1202                    .get(Identifier::Key(key))
1203                    .await
1204                    .expect("Failed to get data")
1205                    .expect("Data not found");
1206                assert_eq!(&retrieved, value);
1207            }
1208
1209            // Test get() on all keys
1210            for (key, _) in &pairs {
1211                assert!(freezer
1212                    .get(Identifier::Key(key))
1213                    .await
1214                    .expect("Failed to check key")
1215                    .is_some());
1216            }
1217
1218            // Check some non-existent keys
1219            for _ in 0..10 {
1220                let mut key = [0u8; 96];
1221                context.fill_bytes(&mut key);
1222                let key = FixedBytes::<96>::new(key);
1223                assert!(freezer
1224                    .get(Identifier::Key(&key))
1225                    .await
1226                    .expect("Failed to check key")
1227                    .is_none());
1228            }
1229
1230            // Close the freezer
1231            let checkpoint = freezer.close().await.expect("Failed to close freezer");
1232
1233            // Reopen the freezer
1234            let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init_with_checkpoint(
1235                context.child("init").with_attribute("index", 2),
1236                cfg.clone(),
1237                Some(checkpoint),
1238            )
1239            .await
1240            .expect("Failed to initialize freezer");
1241
1242            // Verify all pairs are still there after restart
1243            for (key, value) in &pairs {
1244                let retrieved = freezer
1245                    .get(Identifier::Key(key))
1246                    .await
1247                    .expect("Failed to get data")
1248                    .expect("Data not found");
1249                assert_eq!(&retrieved, value);
1250            }
1251
1252            // Add more pairs after restart to test collision handling
1253            for _ in 0..20 {
1254                let mut key = [0u8; 96];
1255                context.fill_bytes(&mut key);
1256                let key = FixedBytes::<96>::new(key);
1257
1258                let mut value = [0u8; 256];
1259                context.fill_bytes(&mut value);
1260                let value = FixedBytes::<256>::new(value);
1261
1262                freezer.put(key, value).await.expect("Failed to put data");
1263            }
1264
1265            // Multiple syncs to test epoch progression
1266            for _ in 0..3 {
1267                freezer.sync().await.expect("Failed to sync");
1268
1269                // Add a few more entries between syncs
1270                for _ in 0..5 {
1271                    let mut key = [0u8; 96];
1272                    context.fill_bytes(&mut key);
1273                    let key = FixedBytes::<96>::new(key);
1274
1275                    let mut value = [0u8; 256];
1276                    context.fill_bytes(&mut value);
1277                    let value = FixedBytes::<256>::new(value);
1278
1279                    freezer.put(key, value).await.expect("Failed to put data");
1280                }
1281            }
1282
1283            // Final sync
1284            freezer.sync().await.expect("Failed to sync");
1285
1286            // Return the auditor state for comparison
1287            context.auditor().state()
1288        })
1289    }
1290
1291    #[test_group("slow")]
1292    #[test_traced]
1293    fn test_determinism() {
1294        let state1 = test_operations_and_restart(1_000);
1295        let state2 = test_operations_and_restart(1_000);
1296        assert_eq!(state1, state2);
1297    }
1298
1299    #[test_traced]
1300    fn test_put_multiple_updates() {
1301        // Initialize the deterministic context
1302        let executor = deterministic::Runner::default();
1303        executor.start(|context| async move {
1304            // Initialize the freezer
1305            let cfg = Config {
1306                key_partition: "test-key-index".into(),
1307                key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1308                key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1309                value_partition: "test-value-journal".into(),
1310                value_compression: None,
1311                value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1312                value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1313                table_partition: "test-table".into(),
1314                table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
1315                table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
1316                table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1317                table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1318                codec_config: (),
1319            };
1320            let mut freezer =
1321                Freezer::<_, FixedBytes<64>, i32>::init(context.child("storage"), cfg.clone())
1322                    .await
1323                    .expect("Failed to initialize freezer");
1324
1325            let key = test_key("key1");
1326
1327            freezer
1328                .put(key.clone(), 1)
1329                .await
1330                .expect("Failed to put data");
1331            freezer
1332                .put(key.clone(), 2)
1333                .await
1334                .expect("Failed to put data");
1335            freezer.sync().await.expect("Failed to sync");
1336            assert_eq!(
1337                freezer
1338                    .get(Identifier::Key(&key))
1339                    .await
1340                    .expect("Failed to get data")
1341                    .unwrap(),
1342                2
1343            );
1344        });
1345    }
1346}