Skip to main content

commonware_storage/ordinal/
mod.rs

1//! A persistent index that maps sparse indices to [commonware_utils::Array]s.
2//!
3//! [Ordinal] is a collection of [commonware_runtime::Blob]s containing ordered records of fixed size.
4//! Because records are fixed size, file position corresponds directly to index. Unlike
5//! [crate::journal::contiguous::fixed::Journal], [Ordinal] supports out-of-order insertion.
6//!
7//! # Design
8//!
9//! [Ordinal] is a collection of [commonware_runtime::Blob]s where:
10//! - Each record: `[V][crc32(V)]` where V is an [commonware_utils::Array]
11//! - Index N is at file offset: `N * RECORD_SIZE`
12//! - A [crate::rmap::RMap] tracks which indices have been written (and which are missing)
13//!
14//! # File Organization
15//!
16//! Records are grouped into blobs to avoid having too many files:
17//!
18//! ```text
19//! Blob 0: indices 0-999
20//! Blob 1: indices 1000-1999
21//! ...
22//! ```
23//!
24//! # Format
25//!
26//! [Ordinal] stores values in the following format:
27//!
28//! ```text
29//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
30//! | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |
31//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
32//! |          Value (Fixed Size)       |     CRC32     |
33//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
34//! ```
35//!
36//! # Performance Characteristics
37//!
38//! - **Writes**: O(1) - direct offset calculation
39//! - **Reads**: O(1) - direct offset calculation
40//! - **Has**: O(1) - in-memory lookup (via [crate::rmap::RMap])
41//! - **Next Gap**: O(log n) - in-memory range query (via [crate::rmap::RMap])
42//! - **Restart**: O(n) where n is the number of existing records (to rebuild [crate::rmap::RMap])
43//!
44//! # Atomicity
45//!
46//! [Ordinal] eagerly writes all new data to [commonware_runtime::Blob]s. New data, however, is not
47//! synced until [Ordinal::sync] is called. As a result, data is not guaranteed to be atomically
48//! persisted (i.e. shutdown before [Ordinal::sync] may lead to some writes being lost).
49//!
50//! _If you want atomicity for sparse writes, pair [commonware_utils::bitmap::BitMap] and
51//! [crate::metadata::Metadata] with [Ordinal] (use bits to indicate which items have been atomically
52//! written)._
53//!
54//! # Recovery
55//!
56//! On restart, [Ordinal] validates all records using their CRC32 and rebuilds the in-memory
57//! [crate::rmap::RMap]. Invalid records (corrupted or empty) are excluded from the rebuilt index.
58//!
59//! # Example
60//!
61//! ```rust
62//! use commonware_runtime::{Spawner, Runner, deterministic};
63//! use commonware_storage::ordinal::{Ordinal, Config};
64//! use commonware_utils::{sequence::FixedBytes, NZUsize, NZU64};
65//!
66//! let executor = deterministic::Runner::default();
67//! executor.start(|context| async move {
68//!     // Create a store for 32-byte values
69//!     let cfg = Config {
70//!         partition: "ordinal-store".into(),
71//!         items_per_blob: NZU64!(10000),
72//!         write_buffer: NZUsize!(4096),
73//!         replay_buffer: NZUsize!(1024 * 1024),
74//!     };
75//!     let mut store = Ordinal::<_, FixedBytes<32>>::init(context, cfg).await.unwrap();
76//!
77//!     // Put values at specific indices
78//!     let value1 = FixedBytes::new([1u8; 32]);
79//!     let value2 = FixedBytes::new([2u8; 32]);
80//!     store.put(0, value1).await.unwrap();
81//!     store.put(5, value2).await.unwrap();
82//!
83//!     // Sync to disk
84//!     store.sync().await.unwrap();
85//!
86//!     // Check for gaps
87//!     let (current_end, next_start) = store.next_gap(0);
88//!     assert_eq!(current_end, Some(0));
89//!     assert_eq!(next_start, Some(5));
90//!
91//!     // Sync the store
92//!     store.sync().await.unwrap();
93//! });
94//! ```
95
96mod storage;
97
98use std::num::{NonZeroU64, NonZeroUsize};
99pub use storage::Ordinal;
100use thiserror::Error;
101
102/// Errors that can occur when interacting with the [Ordinal].
103#[derive(Debug, Error)]
104pub enum Error {
105    #[error("runtime error: {0}")]
106    Runtime(#[from] commonware_runtime::Error),
107    #[error("codec error: {0}")]
108    Codec(#[from] commonware_codec::Error),
109    #[error("invalid blob name: {0}")]
110    InvalidBlobName(String),
111    #[error("invalid record: {0}")]
112    InvalidRecord(u64),
113    #[error("missing record at {0}")]
114    MissingRecord(u64),
115}
116
117/// Configuration for [Ordinal] storage.
118#[derive(Clone)]
119pub struct Config {
120    /// The [commonware_runtime::Storage] partition to use for storing the index.
121    pub partition: String,
122
123    /// The maximum number of items to store in each index blob.
124    pub items_per_blob: NonZeroU64,
125
126    /// The size of the write buffer to use when writing to the index.
127    pub write_buffer: NonZeroUsize,
128
129    /// The size of the read buffer to use on restart.
130    pub replay_buffer: NonZeroUsize,
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use commonware_codec::{FixedSize, Read, ReadExt, Write};
137    use commonware_cryptography::Crc32;
138    use commonware_formatting::hex;
139    use commonware_macros::{test_group, test_traced};
140    use commonware_runtime::{
141        deterministic, Blob, Buf, BufMut, Metrics as _, Runner, Storage, Supervisor as _,
142    };
143    use commonware_utils::{bitmap::BitMap, sequence::FixedBytes, NZUsize, NZU64};
144    use rand::RngCore;
145    use std::collections::BTreeMap;
146
147    const DEFAULT_ITEMS_PER_BLOB: u64 = 1000;
148    const DEFAULT_WRITE_BUFFER: usize = 4096;
149    const DEFAULT_REPLAY_BUFFER: usize = 1024 * 1024;
150
151    #[test_traced]
152    fn test_put_get() {
153        // Initialize the deterministic context
154        let executor = deterministic::Runner::default();
155        executor.start(|context| async move {
156            // Initialize the store
157            let cfg = Config {
158                partition: "test-ordinal".into(),
159                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
160                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
161                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
162            };
163            let mut store =
164                Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
165                    .await
166                    .expect("Failed to initialize store");
167
168            let value = FixedBytes::new([42u8; 32]);
169
170            // Check index doesn't exist
171            assert!(!store.has(0));
172
173            // Put the value at index 0
174            store
175                .put(0, value.clone())
176                .await
177                .expect("Failed to put data");
178
179            // Check index exists
180            assert!(store.has(0));
181
182            // Get the value back (before sync)
183            let retrieved = store
184                .get(0)
185                .await
186                .expect("Failed to get data")
187                .expect("Data not found");
188            assert_eq!(retrieved, value);
189
190            // Force a sync
191            store.sync().await.expect("Failed to sync data");
192
193            // Check metrics
194            let buffer = context.encode();
195            assert!(buffer.contains("gets_total 1"), "{}", buffer);
196            assert!(buffer.contains("puts_total 1"), "{}", buffer);
197            assert!(buffer.contains("has_total 2"), "{}", buffer);
198            assert!(buffer.contains("syncs_total 1"), "{}", buffer);
199            assert!(buffer.contains("pruned_total 0"), "{}", buffer);
200
201            // Get the value back (after sync)
202            let retrieved = store
203                .get(0)
204                .await
205                .expect("Failed to get data")
206                .expect("Data not found");
207            assert_eq!(retrieved, value);
208        });
209    }
210
211    #[test_traced]
212    fn test_concurrent_sync_does_not_report_success_while_flush_fails() {
213        let executor = deterministic::Runner::default();
214        executor.start(|context| async move {
215            let cfg = Config {
216                partition: "test-ordinal".into(),
217                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
218                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
219                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
220            };
221            let mut store =
222                Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
223                    .await
224                    .expect("Failed to initialize store");
225
226            store
227                .put(0, FixedBytes::new([42u8; 32]))
228                .await
229                .expect("Failed to put data");
230
231            // Force flush failure by removing the underlying blob before sync.
232            let section = 0u64.to_be_bytes();
233            context
234                .remove(&cfg.partition, Some(&section))
235                .await
236                .expect("Failed to remove blob");
237
238            // Both concurrent sync calls must observe the in-flight durability failure.
239            let (first, second) = futures::future::join(store.sync(), store.sync()).await;
240            assert!(first.is_err(), "first sync unexpectedly succeeded");
241            assert!(second.is_err(), "second sync unexpectedly succeeded");
242        });
243    }
244
245    #[test_traced]
246    fn test_multiple_indices() {
247        // Initialize the deterministic context
248        let executor = deterministic::Runner::default();
249        executor.start(|context| async move {
250            // Initialize the store
251            let cfg = Config {
252                partition: "test-ordinal".into(),
253                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
254                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
255                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
256            };
257            let mut store =
258                Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
259                    .await
260                    .expect("Failed to initialize store");
261
262            // Insert multiple values at different indices
263            let indices = vec![
264                (0u64, FixedBytes::new([0u8; 32])),
265                (5u64, FixedBytes::new([5u8; 32])),
266                (10u64, FixedBytes::new([10u8; 32])),
267                (100u64, FixedBytes::new([100u8; 32])),
268                (1000u64, FixedBytes::new([200u8; 32])), // Different blob
269            ];
270
271            for (index, value) in &indices {
272                store
273                    .put(*index, value.clone())
274                    .await
275                    .expect("Failed to put data");
276            }
277
278            // Sync to disk
279            store.sync().await.expect("Failed to sync");
280
281            // Retrieve all values and verify
282            for (index, value) in &indices {
283                let retrieved = store
284                    .get(*index)
285                    .await
286                    .expect("Failed to get data")
287                    .expect("Data not found");
288                assert_eq!(&retrieved, value);
289            }
290        });
291    }
292
293    #[test_traced]
294    fn test_sparse_indices() {
295        // Initialize the deterministic context
296        let executor = deterministic::Runner::default();
297        executor.start(|context| async move {
298            // Initialize the store
299            let cfg = Config {
300                partition: "test-ordinal".into(),
301                items_per_blob: NZU64!(100), // Smaller blobs for testing
302                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
303                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
304            };
305            let mut store =
306                Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
307                    .await
308                    .expect("Failed to initialize store");
309
310            // Insert sparse values
311            let indices = vec![
312                (0u64, FixedBytes::new([0u8; 32])),
313                (99u64, FixedBytes::new([99u8; 32])), // End of first blob
314                (100u64, FixedBytes::new([100u8; 32])), // Start of second blob
315                (500u64, FixedBytes::new([200u8; 32])), // Start of sixth blob
316            ];
317
318            for (index, value) in &indices {
319                store
320                    .put(*index, value.clone())
321                    .await
322                    .expect("Failed to put data");
323            }
324
325            // Check that intermediate indices don't exist
326            assert!(!store.has(1));
327            assert!(!store.has(50));
328            assert!(!store.has(101));
329            assert!(!store.has(499));
330
331            // Sync and verify
332            store.sync().await.expect("Failed to sync");
333
334            for (index, value) in &indices {
335                let retrieved = store
336                    .get(*index)
337                    .await
338                    .expect("Failed to get data")
339                    .expect("Data not found");
340                assert_eq!(&retrieved, value);
341            }
342        });
343    }
344
345    #[test_traced]
346    fn test_next_gap() {
347        // Initialize the deterministic context
348        let executor = deterministic::Runner::default();
349        executor.start(|context| async move {
350            // Initialize the store
351            let cfg = Config {
352                partition: "test-ordinal".into(),
353                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
354                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
355                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
356            };
357            let mut store =
358                Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
359                    .await
360                    .expect("Failed to initialize store");
361
362            // Insert values with gaps
363            store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
364            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
365            store.put(11, FixedBytes::new([11u8; 32])).await.unwrap();
366            store.put(14, FixedBytes::new([14u8; 32])).await.unwrap();
367
368            // Check gaps
369            let (current_end, start_next) = store.next_gap(0);
370            assert!(current_end.is_none());
371            assert_eq!(start_next, Some(1));
372
373            let (current_end, start_next) = store.next_gap(1);
374            assert_eq!(current_end, Some(1));
375            assert_eq!(start_next, Some(10));
376
377            let (current_end, start_next) = store.next_gap(10);
378            assert_eq!(current_end, Some(11));
379            assert_eq!(start_next, Some(14));
380
381            let (current_end, start_next) = store.next_gap(11);
382            assert_eq!(current_end, Some(11));
383            assert_eq!(start_next, Some(14));
384
385            let (current_end, start_next) = store.next_gap(12);
386            assert!(current_end.is_none());
387            assert_eq!(start_next, Some(14));
388
389            let (current_end, start_next) = store.next_gap(14);
390            assert_eq!(current_end, Some(14));
391            assert!(start_next.is_none());
392        });
393    }
394
395    #[test_traced]
396    fn test_missing_items() {
397        // Initialize the deterministic context
398        let executor = deterministic::Runner::default();
399        executor.start(|context| async move {
400            // Initialize the store
401            let cfg = Config {
402                partition: "test-ordinal".into(),
403                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
404                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
405                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
406            };
407            let mut store =
408                Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
409                    .await
410                    .expect("Failed to initialize store");
411
412            // Test 1: Empty store - should return no items
413            assert_eq!(store.missing_items(0, 5), Vec::<u64>::new());
414            assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
415
416            // Test 2: Insert values with gaps
417            store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
418            store.put(2, FixedBytes::new([2u8; 32])).await.unwrap();
419            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
420            store.put(6, FixedBytes::new([6u8; 32])).await.unwrap();
421            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
422
423            // Test 3: Find missing items from the beginning
424            assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
425            assert_eq!(store.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
426            assert_eq!(store.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
427
428            // Test 4: Find missing items from within a gap
429            assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
430            assert_eq!(store.missing_items(4, 2), vec![4, 7]);
431
432            // Test 5: Find missing items from within a range
433            assert_eq!(store.missing_items(1, 3), vec![3, 4, 7]);
434            assert_eq!(store.missing_items(2, 4), vec![3, 4, 7, 8]);
435            assert_eq!(store.missing_items(5, 2), vec![7, 8]);
436
437            // Test 6: Find missing items after the last range (no more gaps)
438            assert_eq!(store.missing_items(11, 5), Vec::<u64>::new());
439            assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
440
441            // Test 7: Large gap scenario
442            store.put(1000, FixedBytes::new([100u8; 32])).await.unwrap();
443
444            // Gap between 10 and 1000
445            let items = store.missing_items(11, 10);
446            assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
447
448            // Request more items than available in gap
449            let items = store.missing_items(990, 15);
450            assert_eq!(
451                items,
452                vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
453            );
454
455            // Test 8: After syncing (data should remain consistent)
456            store.sync().await.unwrap();
457            assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
458            assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
459
460            // Test 9: Cross-blob boundary scenario
461            store.put(9999, FixedBytes::new([99u8; 32])).await.unwrap();
462            store
463                .put(10001, FixedBytes::new([101u8; 32]))
464                .await
465                .unwrap();
466
467            // Find missing items across blob boundary (10000 is the boundary)
468            let items = store.missing_items(9998, 5);
469            assert_eq!(items, vec![9998, 10000]);
470        });
471    }
472
473    #[test_traced]
474    fn test_restart() {
475        // Initialize the deterministic context
476        let executor = deterministic::Runner::default();
477        executor.start(|context| async move {
478            let cfg = Config {
479                partition: "test-ordinal".into(),
480                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
481                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
482                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
483            };
484
485            // Insert data and close
486            {
487                let mut store =
488                    Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
489                        .await
490                        .expect("Failed to initialize store");
491
492                let values = vec![
493                    (0u64, FixedBytes::new([0u8; 32])),
494                    (100u64, FixedBytes::new([100u8; 32])),
495                    (1000u64, FixedBytes::new([200u8; 32])),
496                ];
497
498                for (index, value) in &values {
499                    store
500                        .put(*index, value.clone())
501                        .await
502                        .expect("Failed to put data");
503                }
504
505                store.sync().await.expect("Failed to sync store");
506            }
507
508            // Reopen and verify data persisted
509            {
510                let store =
511                    Ordinal::<_, FixedBytes<32>>::init(context.child("second"), cfg.clone())
512                        .await
513                        .expect("Failed to initialize store");
514
515                let values = vec![
516                    (0u64, FixedBytes::new([0u8; 32])),
517                    (100u64, FixedBytes::new([100u8; 32])),
518                    (1000u64, FixedBytes::new([200u8; 32])),
519                ];
520
521                for (index, value) in &values {
522                    let retrieved = store
523                        .get(*index)
524                        .await
525                        .expect("Failed to get data")
526                        .expect("Data not found");
527                    assert_eq!(&retrieved, value);
528                }
529
530                // Check gaps are preserved
531                let (current_end, start_next) = store.next_gap(0);
532                assert_eq!(current_end, Some(0));
533                assert_eq!(start_next, Some(100));
534            }
535        });
536    }
537
538    #[test_traced]
539    fn test_invalid_record() {
540        // Initialize the deterministic context
541        let executor = deterministic::Runner::default();
542        executor.start(|context| async move {
543            let cfg = Config {
544                partition: "test-ordinal".into(),
545                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
546                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
547                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
548            };
549
550            // Create store with data
551            {
552                let mut store =
553                    Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
554                        .await
555                        .expect("Failed to initialize store");
556
557                store
558                    .put(0, FixedBytes::new([42u8; 32]))
559                    .await
560                    .expect("Failed to put data");
561                store.sync().await.expect("Failed to sync store");
562            }
563
564            // Corrupt the data
565            {
566                let (blob, _) = context
567                    .open("test-ordinal", &0u64.to_be_bytes())
568                    .await
569                    .unwrap();
570                // Corrupt the CRC by changing a byte
571                blob.write_at_sync(32, vec![0xFF]).await.unwrap();
572            }
573
574            // Reopen and try to read corrupted data
575            {
576                let store =
577                    Ordinal::<_, FixedBytes<32>>::init(context.child("second"), cfg.clone())
578                        .await
579                        .expect("Failed to initialize store");
580
581                // Reading corrupt record will return empty
582                let result = store.get(0).await.unwrap();
583                assert!(result.is_none());
584
585                // The index should not be in the intervals after restart with corrupted data
586                assert!(!store.has(0));
587            }
588        });
589    }
590
591    #[test_traced]
592    fn test_get_nonexistent() {
593        // Initialize the deterministic context
594        let executor = deterministic::Runner::default();
595        executor.start(|context| async move {
596            // Initialize the store
597            let cfg = Config {
598                partition: "test-ordinal".into(),
599                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
600                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
601                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
602            };
603            let store = Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
604                .await
605                .expect("Failed to initialize store");
606
607            // Attempt to get an index that doesn't exist
608            let retrieved = store.get(999).await.expect("Failed to get data");
609            assert!(retrieved.is_none());
610
611            // Check has returns false
612            assert!(!store.has(999));
613        });
614    }
615
616    #[test_traced]
617    fn test_destroy() {
618        // Initialize the deterministic context
619        let executor = deterministic::Runner::default();
620        executor.start(|context| async move {
621            let cfg = Config {
622                partition: "test-ordinal".into(),
623                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
624                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
625                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
626            };
627
628            // Create store with data
629            {
630                let mut store =
631                    Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
632                        .await
633                        .expect("Failed to initialize store");
634
635                store
636                    .put(0, FixedBytes::new([0u8; 32]))
637                    .await
638                    .expect("Failed to put data");
639                store
640                    .put(1000, FixedBytes::new([100u8; 32]))
641                    .await
642                    .expect("Failed to put data");
643
644                // Destroy the store
645                store.destroy().await.expect("Failed to destroy store");
646            }
647
648            // Try to create a new store - it should be empty
649            {
650                let store =
651                    Ordinal::<_, FixedBytes<32>>::init(context.child("second"), cfg.clone())
652                        .await
653                        .expect("Failed to initialize store");
654
655                // Should not find any data
656                assert!(store.get(0).await.unwrap().is_none());
657                assert!(store.get(1000).await.unwrap().is_none());
658                assert!(!store.has(0));
659                assert!(!store.has(1000));
660            }
661        });
662    }
663
664    #[test_traced]
665    fn test_partial_record_write() {
666        // Initialize the deterministic context
667        let executor = deterministic::Runner::default();
668        executor.start(|context| async move {
669            let cfg = Config {
670                partition: "test-ordinal".into(),
671                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
672                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
673                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
674            };
675
676            // Create store with data
677            {
678                let mut store =
679                    Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
680                        .await
681                        .expect("Failed to initialize store");
682
683                store
684                    .put(0, FixedBytes::new([42u8; 32]))
685                    .await
686                    .expect("Failed to put data");
687                store
688                    .put(1, FixedBytes::new([43u8; 32]))
689                    .await
690                    .expect("Failed to put data");
691                store.sync().await.expect("Failed to sync store");
692            }
693
694            // Corrupt by writing partial record (only value, no CRC)
695            {
696                let (blob, _) = context
697                    .open("test-ordinal", &0u64.to_be_bytes())
698                    .await
699                    .unwrap();
700                // Overwrite second record with partial data (32 bytes instead of 36)
701                blob.write_at_sync(36, vec![0xFF; 32]).await.unwrap();
702            }
703
704            // Reopen and verify it handles partial write gracefully
705            {
706                let store =
707                    Ordinal::<_, FixedBytes<32>>::init(context.child("second"), cfg.clone())
708                        .await
709                        .expect("Failed to initialize store");
710
711                // First record should be fine
712                assert_eq!(
713                    store.get(0).await.unwrap().unwrap(),
714                    FixedBytes::new([42u8; 32])
715                );
716
717                // Second record should be removed due to partial write
718                assert!(!store.has(1));
719                assert!(store.get(1).await.unwrap().is_none());
720
721                // Store should still be functional
722                let mut store_mut = store;
723                store_mut.put(1, FixedBytes::new([44u8; 32])).await.unwrap();
724                assert_eq!(
725                    store_mut.get(1).await.unwrap().unwrap(),
726                    FixedBytes::new([44u8; 32])
727                );
728            }
729        });
730    }
731
732    #[test_traced]
733    fn test_corrupted_value() {
734        // Initialize the deterministic context
735        let executor = deterministic::Runner::default();
736        executor.start(|context| async move {
737            let cfg = Config {
738                partition: "test-ordinal".into(),
739                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
740                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
741                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
742            };
743
744            // Create store with data
745            {
746                let mut store =
747                    Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
748                        .await
749                        .expect("Failed to initialize store");
750
751                store
752                    .put(0, FixedBytes::new([42u8; 32]))
753                    .await
754                    .expect("Failed to put data");
755                store
756                    .put(1, FixedBytes::new([43u8; 32]))
757                    .await
758                    .expect("Failed to put data");
759                store.sync().await.expect("Failed to sync store");
760            }
761
762            // Corrupt the value portion of a record
763            {
764                let (blob, _) = context
765                    .open("test-ordinal", &0u64.to_be_bytes())
766                    .await
767                    .unwrap();
768                // Corrupt some bytes in the value of the first record
769                blob.write_at_sync(10, hex!("0xFFFFFFFF").to_vec())
770                    .await
771                    .unwrap();
772            }
773
774            // Reopen and verify it detects corruption
775            {
776                let store =
777                    Ordinal::<_, FixedBytes<32>>::init(context.child("second"), cfg.clone())
778                        .await
779                        .expect("Failed to initialize store");
780
781                // First record should be detected as corrupted (CRC mismatch)
782                assert!(!store.has(0));
783
784                // Second record should still be valid
785                assert!(store.has(1));
786                assert_eq!(
787                    store.get(1).await.unwrap().unwrap(),
788                    FixedBytes::new([43u8; 32])
789                );
790            }
791        });
792    }
793
794    #[test_traced]
795    fn test_crc_corruptions() {
796        // Initialize the deterministic context
797        let executor = deterministic::Runner::default();
798        executor.start(|context| async move {
799            let cfg = Config {
800                partition: "test-ordinal".into(),
801                items_per_blob: NZU64!(10), // Small blob size for testing
802                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
803                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
804            };
805
806            // Create store with data across multiple blobs
807            {
808                let mut store =
809                    Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
810                        .await
811                        .expect("Failed to initialize store");
812
813                // Add values across 2 blobs
814                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
815                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
816                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
817                store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
818                store.sync().await.expect("Failed to sync store");
819            }
820
821            // Corrupt CRCs in different blobs
822            {
823                // Corrupt CRC in first blob
824                let (blob, _) = context
825                    .open("test-ordinal", &0u64.to_be_bytes())
826                    .await
827                    .unwrap();
828                blob.write_at_sync(32, vec![0xFF]).await.unwrap(); // Corrupt CRC of index 0
829
830                // Corrupt value in second blob (which will invalidate CRC)
831                let (blob, _) = context
832                    .open("test-ordinal", &1u64.to_be_bytes())
833                    .await
834                    .unwrap();
835                blob.write_at_sync(5, vec![0xFF; 4]).await.unwrap(); // Corrupt value of index 10
836            }
837
838            // Reopen and verify handling of CRC corruptions
839            {
840                let store =
841                    Ordinal::<_, FixedBytes<32>>::init(context.child("second"), cfg.clone())
842                        .await
843                        .expect("Failed to initialize store");
844
845                // Corrupted records should not be present
846                assert!(!store.has(0)); // CRC corrupted
847                assert!(!store.has(10)); // Value corrupted (CRC mismatch)
848
849                // Valid records should still be accessible
850                assert!(store.has(5));
851                assert!(store.has(15));
852                assert_eq!(
853                    store.get(5).await.unwrap().unwrap(),
854                    FixedBytes::new([5u8; 32])
855                );
856                assert_eq!(
857                    store.get(15).await.unwrap().unwrap(),
858                    FixedBytes::new([15u8; 32])
859                );
860            }
861        });
862    }
863
864    #[test_traced]
865    fn test_extra_bytes_in_blob() {
866        // Initialize the deterministic context
867        let executor = deterministic::Runner::default();
868        executor.start(|context| async move {
869            let cfg = Config {
870                partition: "test-ordinal".into(),
871                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
872                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
873                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
874            };
875
876            // Create store with data
877            {
878                let mut store =
879                    Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
880                        .await
881                        .expect("Failed to initialize store");
882
883                store
884                    .put(0, FixedBytes::new([42u8; 32]))
885                    .await
886                    .expect("Failed to put data");
887                store
888                    .put(1, FixedBytes::new([43u8; 32]))
889                    .await
890                    .expect("Failed to put data");
891                store.sync().await.expect("Failed to sync store");
892            }
893
894            // Add extra bytes at the end of blob
895            {
896                let (blob, size) = context
897                    .open("test-ordinal", &0u64.to_be_bytes())
898                    .await
899                    .unwrap();
900                // Add garbage data that forms a complete but invalid record
901                // This avoids partial record issues
902                let mut garbage = vec![0xFF; 32]; // Invalid value
903                let invalid_crc = 0xDEADBEEFu32;
904                garbage.extend_from_slice(&invalid_crc.to_be_bytes());
905                assert_eq!(garbage.len(), 36); // Full record size
906                blob.write_at_sync(size, garbage).await.unwrap();
907            }
908
909            // Reopen and verify it handles extra bytes
910            {
911                let store =
912                    Ordinal::<_, FixedBytes<32>>::init(context.child("second"), cfg.clone())
913                        .await
914                        .expect("Failed to initialize store");
915
916                // Original records should still be valid
917                assert!(store.has(0));
918                assert!(store.has(1));
919                assert_eq!(
920                    store.get(0).await.unwrap().unwrap(),
921                    FixedBytes::new([42u8; 32])
922                );
923                assert_eq!(
924                    store.get(1).await.unwrap().unwrap(),
925                    FixedBytes::new([43u8; 32])
926                );
927
928                // Store should still be functional
929                let mut store_mut = store;
930                store_mut.put(2, FixedBytes::new([44u8; 32])).await.unwrap();
931                assert_eq!(
932                    store_mut.get(2).await.unwrap().unwrap(),
933                    FixedBytes::new([44u8; 32])
934                );
935            }
936        });
937    }
938
939    #[test_traced]
940    fn test_zero_filled_records() {
941        // Initialize the deterministic context
942        let executor = deterministic::Runner::default();
943        executor.start(|context| async move {
944            let cfg = Config {
945                partition: "test-ordinal".into(),
946                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
947                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
948                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
949            };
950
951            // Create blob with zero-filled space
952            {
953                let (blob, _) = context
954                    .open("test-ordinal", &0u64.to_be_bytes())
955                    .await
956                    .unwrap();
957
958                // Write zeros for several record positions
959                let zeros = vec![0u8; 36 * 5]; // 5 records worth of zeros
960                blob.write_at_sync(0, zeros).await.unwrap();
961
962                // Write a valid record after the zeros
963                let mut valid_record = vec![44u8; 32];
964                let crc = Crc32::checksum(&valid_record);
965                valid_record.extend_from_slice(&crc.to_be_bytes());
966                blob.write_at_sync(36 * 5, valid_record).await.unwrap();
967            }
968
969            // Initialize store and verify it handles zero-filled records
970            {
971                let store =
972                    Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
973                        .await
974                        .expect("Failed to initialize store");
975
976                // Zero-filled positions should not be considered valid
977                for i in 0..5 {
978                    assert!(!store.has(i));
979                }
980
981                // The valid record should be found
982                assert!(store.has(5));
983                assert_eq!(
984                    store.get(5).await.unwrap().unwrap(),
985                    FixedBytes::new([44u8; 32])
986                );
987            }
988        });
989    }
990
991    fn test_operations_and_restart(num_values: usize) -> String {
992        // Initialize the deterministic context
993        let executor = deterministic::Runner::default();
994        executor.start(|mut context| async move {
995            let cfg = Config {
996                partition: "test-ordinal".into(),
997                items_per_blob: NZU64!(100), // Smaller blobs to test multiple blob handling
998                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
999                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1000            };
1001
1002            // Initialize the store
1003            let mut store =
1004                Ordinal::<_, FixedBytes<128>>::init(context.child("first"), cfg.clone())
1005                    .await
1006                    .expect("Failed to initialize store");
1007
1008            // Generate and insert random values at various indices
1009            let mut values = Vec::new();
1010            let mut rng_index = 0u64;
1011
1012            for _ in 0..num_values {
1013                // Generate a pseudo-random index (sparse to test gaps)
1014                let mut index_bytes = [0u8; 8];
1015                context.fill_bytes(&mut index_bytes);
1016                let index_offset = u64::from_be_bytes(index_bytes) % 1000;
1017                let index = rng_index + index_offset;
1018                rng_index = index + 1;
1019
1020                // Generate random value
1021                let mut value = [0u8; 128];
1022                context.fill_bytes(&mut value);
1023                let value = FixedBytes::<128>::new(value);
1024
1025                store
1026                    .put(index, value.clone())
1027                    .await
1028                    .expect("Failed to put data");
1029                values.push((index, value));
1030            }
1031
1032            // Sync data
1033            store.sync().await.expect("Failed to sync");
1034
1035            // Verify all values can be retrieved
1036            for (index, value) in &values {
1037                let retrieved = store
1038                    .get(*index)
1039                    .await
1040                    .expect("Failed to get data")
1041                    .expect("Data not found");
1042                assert_eq!(&retrieved, value);
1043            }
1044
1045            // Test next_gap on various indices
1046            for i in 0..10 {
1047                let _ = store.next_gap(i * 100);
1048            }
1049
1050            // Sync and drop the store
1051            store.sync().await.expect("Failed to sync store");
1052            drop(store);
1053
1054            // Reopen the store
1055            let mut store = Ordinal::<_, FixedBytes<128>>::init(context.child("second"), cfg)
1056                .await
1057                .expect("Failed to initialize store");
1058
1059            // Verify all values are still there after restart
1060            for (index, value) in &values {
1061                let retrieved = store
1062                    .get(*index)
1063                    .await
1064                    .expect("Failed to get data")
1065                    .expect("Data not found");
1066                assert_eq!(&retrieved, value);
1067            }
1068
1069            // Add more values after restart
1070            for _ in 0..10 {
1071                let mut index_bytes = [0u8; 8];
1072                context.fill_bytes(&mut index_bytes);
1073                let index = u64::from_be_bytes(index_bytes) % 10000;
1074
1075                let mut value = [0u8; 128];
1076                context.fill_bytes(&mut value);
1077                let value = FixedBytes::<128>::new(value);
1078
1079                store.put(index, value).await.expect("Failed to put data");
1080            }
1081
1082            // Final sync
1083            store.sync().await.expect("Failed to sync");
1084
1085            // Return the auditor state for comparison
1086            context.auditor().state()
1087        })
1088    }
1089
1090    #[test_group("slow")]
1091    #[test_traced]
1092    fn test_determinism() {
1093        let state1 = test_operations_and_restart(100);
1094        let state2 = test_operations_and_restart(100);
1095        assert_eq!(state1, state2);
1096    }
1097
1098    #[test_traced]
1099    fn test_prune_basic() {
1100        // Initialize the deterministic context
1101        let executor = deterministic::Runner::default();
1102        executor.start(|context| async move {
1103            let cfg = Config {
1104                partition: "test-ordinal".into(),
1105                items_per_blob: NZU64!(100), // Small blobs to test multiple blob handling
1106                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1107                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1108            };
1109
1110            let mut store =
1111                Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
1112                    .await
1113                    .expect("Failed to initialize store");
1114
1115            // Insert data across multiple blobs
1116            let values = vec![
1117                (0u64, FixedBytes::new([0u8; 32])),     // Blob 0
1118                (50u64, FixedBytes::new([50u8; 32])),   // Blob 0
1119                (100u64, FixedBytes::new([100u8; 32])), // Blob 1
1120                (150u64, FixedBytes::new([150u8; 32])), // Blob 1
1121                (200u64, FixedBytes::new([200u8; 32])), // Blob 2
1122                (300u64, FixedBytes::new([44u8; 32])),  // Blob 3
1123            ];
1124
1125            for (index, value) in &values {
1126                store
1127                    .put(*index, value.clone())
1128                    .await
1129                    .expect("Failed to put data");
1130            }
1131            store.sync().await.unwrap();
1132
1133            // Verify all values exist
1134            for (index, value) in &values {
1135                assert_eq!(store.get(*index).await.unwrap().unwrap(), *value);
1136            }
1137
1138            // Prune up to index 150 (should remove blob 0 only)
1139            store.prune(150).await.unwrap();
1140            let buffer = context.encode();
1141            assert!(buffer.contains("pruned_total 1"));
1142
1143            // Verify pruned data is gone
1144            assert!(!store.has(0));
1145            assert!(!store.has(50));
1146            assert!(store.get(0).await.unwrap().is_none());
1147            assert!(store.get(50).await.unwrap().is_none());
1148
1149            // Verify remaining data is still there
1150            assert!(store.has(100));
1151            assert!(store.has(150));
1152            assert!(store.has(200));
1153            assert!(store.has(300));
1154            assert_eq!(store.get(100).await.unwrap().unwrap(), values[2].1);
1155            assert_eq!(store.get(150).await.unwrap().unwrap(), values[3].1);
1156            assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1157            assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1158
1159            // Prune more aggressively - up to index 250 (should remove blob 1)
1160            store.prune(250).await.unwrap();
1161            let buffer = context.encode();
1162            assert!(buffer.contains("pruned_total 2"));
1163
1164            // Verify more data is pruned
1165            assert!(!store.has(100));
1166            assert!(!store.has(150));
1167            assert!(store.get(100).await.unwrap().is_none());
1168            assert!(store.get(150).await.unwrap().is_none());
1169
1170            // Verify remaining data
1171            assert!(store.has(200));
1172            assert!(store.has(300));
1173            assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1174            assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1175        });
1176    }
1177
1178    #[test_traced]
1179    fn test_prune_with_gaps() {
1180        // Initialize the deterministic context
1181        let executor = deterministic::Runner::default();
1182        executor.start(|context| async move {
1183            let cfg = Config {
1184                partition: "test-ordinal".into(),
1185                items_per_blob: NZU64!(100),
1186                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1187                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1188            };
1189
1190            let mut store =
1191                Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
1192                    .await
1193                    .expect("Failed to initialize store");
1194
1195            // Insert sparse data with gaps
1196            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1197            store.put(105, FixedBytes::new([105u8; 32])).await.unwrap();
1198            store.put(305, FixedBytes::new([49u8; 32])).await.unwrap();
1199            store.sync().await.unwrap();
1200
1201            // Check gaps before pruning
1202            let (current_end, next_start) = store.next_gap(0);
1203            assert!(current_end.is_none());
1204            assert_eq!(next_start, Some(5));
1205
1206            let (current_end, next_start) = store.next_gap(5);
1207            assert_eq!(current_end, Some(5));
1208            assert_eq!(next_start, Some(105));
1209
1210            // Prune up to index 150 (should remove blob 0)
1211            store.prune(150).await.unwrap();
1212
1213            // Verify pruned data is gone
1214            assert!(!store.has(5));
1215            assert!(store.get(5).await.unwrap().is_none());
1216
1217            // Verify remaining data and gaps
1218            assert!(store.has(105));
1219            assert!(store.has(305));
1220
1221            let (current_end, next_start) = store.next_gap(0);
1222            assert!(current_end.is_none());
1223            assert_eq!(next_start, Some(105));
1224
1225            let (current_end, next_start) = store.next_gap(105);
1226            assert_eq!(current_end, Some(105));
1227            assert_eq!(next_start, Some(305));
1228        });
1229    }
1230
1231    #[test_traced]
1232    fn test_prune_no_op() {
1233        // Initialize the deterministic context
1234        let executor = deterministic::Runner::default();
1235        executor.start(|context| async move {
1236            let cfg = Config {
1237                partition: "test-ordinal".into(),
1238                items_per_blob: NZU64!(100),
1239                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1240                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1241            };
1242
1243            let mut store =
1244                Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
1245                    .await
1246                    .expect("Failed to initialize store");
1247
1248            // Insert data
1249            store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1250            store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1251            store.sync().await.unwrap();
1252
1253            // Try to prune before any data - should be no-op
1254            store.prune(50).await.unwrap();
1255
1256            // Verify no data was actually pruned
1257            assert!(store.has(100));
1258            assert!(store.has(200));
1259            let buffer = context.encode();
1260            assert!(buffer.contains("pruned_total 0"));
1261
1262            // Try to prune exactly at blob boundary - should be no-op
1263            store.prune(100).await.unwrap();
1264
1265            // Verify still no data pruned
1266            assert!(store.has(100));
1267            assert!(store.has(200));
1268            let buffer = context.encode();
1269            assert!(buffer.contains("pruned_total 0"));
1270        });
1271    }
1272
1273    #[test_traced]
1274    fn test_prune_empty_store() {
1275        // Initialize the deterministic context
1276        let executor = deterministic::Runner::default();
1277        executor.start(|context| async move {
1278            let cfg = Config {
1279                partition: "test-ordinal".into(),
1280                items_per_blob: NZU64!(100),
1281                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1282                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1283            };
1284
1285            let mut store =
1286                Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
1287                    .await
1288                    .expect("Failed to initialize store");
1289
1290            // Try to prune empty store
1291            store.prune(1000).await.unwrap();
1292
1293            // Store should still be functional
1294            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1295            assert!(store.has(0));
1296        });
1297    }
1298
1299    #[test_traced]
1300    fn test_prune_after_restart() {
1301        // Initialize the deterministic context
1302        let executor = deterministic::Runner::default();
1303        executor.start(|context| async move {
1304            let cfg = Config {
1305                partition: "test-ordinal".into(),
1306                items_per_blob: NZU64!(100),
1307                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1308                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1309            };
1310
1311            // Create store and add data
1312            {
1313                let mut store =
1314                    Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
1315                        .await
1316                        .expect("Failed to initialize store");
1317
1318                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1319                store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1320                store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1321                store.sync().await.unwrap();
1322            }
1323
1324            // Reopen and prune
1325            {
1326                let mut store =
1327                    Ordinal::<_, FixedBytes<32>>::init(context.child("second"), cfg.clone())
1328                        .await
1329                        .expect("Failed to initialize store");
1330
1331                // Verify data is there
1332                assert!(store.has(0));
1333                assert!(store.has(100));
1334                assert!(store.has(200));
1335
1336                // Prune up to index 150
1337                store.prune(150).await.unwrap();
1338
1339                // Verify pruning worked
1340                assert!(!store.has(0));
1341                assert!(store.has(100));
1342                assert!(store.has(200));
1343
1344                store.sync().await.unwrap();
1345            }
1346
1347            // Reopen again and verify pruning persisted
1348            {
1349                let store = Ordinal::<_, FixedBytes<32>>::init(context.child("third"), cfg.clone())
1350                    .await
1351                    .expect("Failed to initialize store");
1352
1353                assert!(!store.has(0));
1354                assert!(store.has(100));
1355                assert!(store.has(200));
1356
1357                // Check gaps
1358                let (current_end, next_start) = store.next_gap(0);
1359                assert!(current_end.is_none());
1360                assert_eq!(next_start, Some(100));
1361            }
1362        });
1363    }
1364
1365    #[test_traced]
1366    fn test_prune_multiple_operations() {
1367        // Initialize the deterministic context
1368        let executor = deterministic::Runner::default();
1369        executor.start(|context| async move {
1370            let cfg = Config {
1371                partition: "test-ordinal".into(),
1372                items_per_blob: NZU64!(50), // Smaller blobs for more granular testing
1373                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1374                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1375            };
1376
1377            let mut store =
1378                Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
1379                    .await
1380                    .expect("Failed to initialize store");
1381
1382            // Insert data across many blobs
1383            let mut values = Vec::new();
1384            for i in 0..10 {
1385                let index = i * 50 + 25; // Middle of each blob
1386                let value = FixedBytes::new([i as u8; 32]);
1387                store.put(index, value.clone()).await.unwrap();
1388                values.push((index, value));
1389            }
1390            store.sync().await.unwrap();
1391
1392            // Prune incrementally
1393            for i in 1..5 {
1394                let prune_index = i * 50 + 10;
1395                store.prune(prune_index).await.unwrap();
1396
1397                // Verify appropriate data is pruned
1398                for (index, _) in &values {
1399                    if *index < prune_index {
1400                        assert!(!store.has(*index), "Index {index} should be pruned");
1401                    } else {
1402                        assert!(store.has(*index), "Index {index} should not be pruned");
1403                    }
1404                }
1405            }
1406
1407            // Check final state
1408            let buffer = context.encode();
1409            assert!(buffer.contains("pruned_total 4"));
1410
1411            // Verify remaining data
1412            for i in 4..10 {
1413                let index = i * 50 + 25;
1414                assert!(store.has(index));
1415                assert_eq!(
1416                    store.get(index).await.unwrap().unwrap(),
1417                    values[i as usize].1
1418                );
1419            }
1420        });
1421    }
1422
1423    #[test_traced]
1424    fn test_prune_blob_boundaries() {
1425        // Initialize the deterministic context
1426        let executor = deterministic::Runner::default();
1427        executor.start(|context| async move {
1428            let cfg = Config {
1429                partition: "test-ordinal".into(),
1430                items_per_blob: NZU64!(100),
1431                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1432                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1433            };
1434
1435            let mut store =
1436                Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
1437                    .await
1438                    .expect("Failed to initialize store");
1439
1440            // Insert data at blob boundaries
1441            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); // Start of blob 0
1442            store.put(99, FixedBytes::new([99u8; 32])).await.unwrap(); // End of blob 0
1443            store.put(100, FixedBytes::new([100u8; 32])).await.unwrap(); // Start of blob 1
1444            store.put(199, FixedBytes::new([199u8; 32])).await.unwrap(); // End of blob 1
1445            store.put(200, FixedBytes::new([200u8; 32])).await.unwrap(); // Start of blob 2
1446            store.sync().await.unwrap();
1447
1448            // Test various pruning points around boundaries
1449
1450            // Prune exactly at blob boundary (100) - should prune blob 0
1451            store.prune(100).await.unwrap();
1452            assert!(!store.has(0));
1453            assert!(!store.has(99));
1454            assert!(store.has(100));
1455            assert!(store.has(199));
1456            assert!(store.has(200));
1457
1458            // Prune just before next boundary (199) - should not prune blob 1
1459            store.prune(199).await.unwrap();
1460            assert!(store.has(100));
1461            assert!(store.has(199));
1462            assert!(store.has(200));
1463
1464            // Prune exactly at next boundary (200) - should prune blob 1
1465            store.prune(200).await.unwrap();
1466            assert!(!store.has(100));
1467            assert!(!store.has(199));
1468            assert!(store.has(200));
1469
1470            let buffer = context.encode();
1471            assert!(buffer.contains("pruned_total 2"));
1472        });
1473    }
1474
1475    #[test_traced]
1476    fn test_prune_non_contiguous_sections() {
1477        // Initialize the deterministic context
1478        let executor = deterministic::Runner::default();
1479        executor.start(|context| async move {
1480            let cfg = Config {
1481                partition: "test-ordinal".into(),
1482                items_per_blob: NZU64!(100),
1483                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1484                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1485            };
1486
1487            let mut store =
1488                Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
1489                    .await
1490                    .expect("Failed to initialize store");
1491
1492            // Insert data in non-contiguous sections (0, 2, 5, 7)
1493            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); // Section 0
1494            store.put(250, FixedBytes::new([50u8; 32])).await.unwrap(); // Section 2 (250/100 = 2)
1495            store.put(500, FixedBytes::new([44u8; 32])).await.unwrap(); // Section 5 (500/100 = 5)
1496            store.put(750, FixedBytes::new([45u8; 32])).await.unwrap(); // Section 7 (750/100 = 7)
1497            store.sync().await.unwrap();
1498
1499            // Verify all data exists initially
1500            assert!(store.has(0));
1501            assert!(store.has(250));
1502            assert!(store.has(500));
1503            assert!(store.has(750));
1504
1505            // Prune up to section 3 (index 300) - should remove sections 0 and 2
1506            store.prune(300).await.unwrap();
1507
1508            // Verify correct data was pruned
1509            assert!(!store.has(0)); // Section 0 pruned
1510            assert!(!store.has(250)); // Section 2 pruned
1511            assert!(store.has(500)); // Section 5 remains
1512            assert!(store.has(750)); // Section 7 remains
1513
1514            let buffer = context.encode();
1515            assert!(buffer.contains("pruned_total 2"));
1516
1517            // Prune up to section 6 (index 600) - should remove section 5
1518            store.prune(600).await.unwrap();
1519
1520            // Verify section 5 was pruned
1521            assert!(!store.has(500)); // Section 5 pruned
1522            assert!(store.has(750)); // Section 7 remains
1523
1524            let buffer = context.encode();
1525            assert!(buffer.contains("pruned_total 3"));
1526
1527            // Prune everything - should remove section 7
1528            store.prune(1000).await.unwrap();
1529
1530            // Verify all data is gone
1531            assert!(!store.has(750)); // Section 7 pruned
1532
1533            let buffer = context.encode();
1534            assert!(buffer.contains("pruned_total 4"));
1535        });
1536    }
1537
1538    #[test_traced]
1539    fn test_prune_removes_correct_pending() {
1540        // Initialize the deterministic context
1541        let executor = deterministic::Runner::default();
1542        executor.start(|context| async move {
1543            let cfg = Config {
1544                partition: "test-ordinal".into(),
1545                items_per_blob: NZU64!(100),
1546                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1547                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1548            };
1549            let mut store =
1550                Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
1551                    .await
1552                    .expect("Failed to initialize store");
1553
1554            // Insert and sync some data in blob 0
1555            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1556            store.sync().await.unwrap();
1557
1558            // Add pending entries to blob 0 and blob 1
1559            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap(); // blob 0
1560            store.put(110, FixedBytes::new([110u8; 32])).await.unwrap(); // blob 1
1561
1562            // Verify all data is visible before pruning
1563            assert!(store.has(5));
1564            assert!(store.has(10));
1565            assert!(store.has(110));
1566
1567            // Prune up to index 100, which should remove blob 0 (indices 0-99).
1568            store.prune(150).await.unwrap();
1569
1570            // Verify that synced and pending entries in blob 0 are removed.
1571            assert!(!store.has(5));
1572            assert!(!store.has(10));
1573
1574            // Verify that the pending entry in blob 1 remains.
1575            assert!(store.has(110));
1576            assert_eq!(
1577                store.get(110).await.unwrap().unwrap(),
1578                FixedBytes::new([110u8; 32])
1579            );
1580
1581            // Sync the remaining pending entry and verify it's still there.
1582            store.sync().await.unwrap();
1583            assert!(store.has(110));
1584            assert_eq!(
1585                store.get(110).await.unwrap().unwrap(),
1586                FixedBytes::new([110u8; 32])
1587            );
1588        });
1589    }
1590
1591    #[test_traced]
1592    fn test_init_with_bits_none() {
1593        // Initialize the deterministic context
1594        let executor = deterministic::Runner::default();
1595        executor.start(|context| async move {
1596            let cfg = Config {
1597                partition: "test-ordinal".into(),
1598                items_per_blob: NZU64!(10), // Small blob size for testing
1599                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1600                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1601            };
1602
1603            // Create store with data across multiple sections
1604            {
1605                let mut store =
1606                    Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
1607                        .await
1608                        .expect("Failed to initialize store");
1609
1610                // Section 0 (indices 0-9)
1611                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1612                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1613                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1614
1615                // Section 1 (indices 10-19)
1616                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1617                store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
1618
1619                // Section 2 (indices 20-29)
1620                store.put(25, FixedBytes::new([25u8; 32])).await.unwrap();
1621
1622                store.sync().await.unwrap();
1623            }
1624
1625            // Reinitialize with bits = None (should behave like regular init)
1626            {
1627                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1628                    context.child("second"),
1629                    cfg.clone(),
1630                    None,
1631                )
1632                .await
1633                .expect("Failed to initialize store with bits");
1634
1635                // All records should be available
1636                assert!(store.has(0));
1637                assert!(store.has(5));
1638                assert!(store.has(9));
1639                assert!(store.has(10));
1640                assert!(store.has(15));
1641                assert!(store.has(25));
1642
1643                // Non-existent records should not be available
1644                assert!(!store.has(1));
1645                assert!(!store.has(11));
1646                assert!(!store.has(20));
1647
1648                // Verify values
1649                assert_eq!(
1650                    store.get(0).await.unwrap().unwrap(),
1651                    FixedBytes::new([0u8; 32])
1652                );
1653                assert_eq!(
1654                    store.get(15).await.unwrap().unwrap(),
1655                    FixedBytes::new([15u8; 32])
1656                );
1657            }
1658        });
1659    }
1660
1661    #[test_traced]
1662    fn test_init_with_bits_empty_hashmap() {
1663        // Initialize the deterministic context
1664        let executor = deterministic::Runner::default();
1665        executor.start(|context| async move {
1666            let cfg = Config {
1667                partition: "test-ordinal".into(),
1668                items_per_blob: NZU64!(10),
1669                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1670                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1671            };
1672
1673            // Create store with data
1674            {
1675                let mut store =
1676                    Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
1677                        .await
1678                        .expect("Failed to initialize store");
1679
1680                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1681                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1682                store.put(20, FixedBytes::new([20u8; 32])).await.unwrap();
1683
1684                store.sync().await.unwrap();
1685            }
1686
1687            // Reinitialize with empty HashMap - should skip all sections
1688            {
1689                let bits: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1690                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1691                    context.child("second"),
1692                    cfg.clone(),
1693                    Some(bits),
1694                )
1695                .await
1696                .expect("Failed to initialize store with bits");
1697
1698                // No records should be available since no sections were in the bits map
1699                assert!(!store.has(0));
1700                assert!(!store.has(10));
1701                assert!(!store.has(20));
1702            }
1703        });
1704    }
1705
1706    #[test_traced]
1707    fn test_init_with_bits_selective_sections() {
1708        // Initialize the deterministic context
1709        let executor = deterministic::Runner::default();
1710        executor.start(|context| async move {
1711            let cfg = Config {
1712                partition: "test-ordinal".into(),
1713                items_per_blob: NZU64!(10),
1714                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1715                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1716            };
1717
1718            // Create store with data in multiple sections
1719            {
1720                let mut store =
1721                    Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
1722                        .await
1723                        .expect("Failed to initialize store");
1724
1725                // Section 0 (indices 0-9)
1726                for i in 0..10 {
1727                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1728                }
1729
1730                // Section 1 (indices 10-19)
1731                for i in 10..20 {
1732                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1733                }
1734
1735                // Section 2 (indices 20-29)
1736                for i in 20..30 {
1737                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1738                }
1739
1740                store.sync().await.unwrap();
1741            }
1742
1743            // Reinitialize with bits for only section 1
1744            {
1745                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1746
1747                // Create a BitMap that marks indices 12, 15, and 18 as present
1748                let mut bitmap = BitMap::zeroes(10);
1749                bitmap.set(2, true); // Index 12 (offset 2 in section 1)
1750                bitmap.set(5, true); // Index 15 (offset 5 in section 1)
1751                bitmap.set(8, true); // Index 18 (offset 8 in section 1)
1752                let bitmap_option = Some(bitmap);
1753
1754                bits_map.insert(1, &bitmap_option);
1755
1756                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1757                    context.child("second"),
1758                    cfg.clone(),
1759                    Some(bits_map),
1760                )
1761                .await
1762                .expect("Failed to initialize store with bits");
1763
1764                // Only specified indices from section 1 should be available
1765                assert!(store.has(12));
1766                assert!(store.has(15));
1767                assert!(store.has(18));
1768
1769                // Other indices from section 1 should not be available
1770                assert!(!store.has(10));
1771                assert!(!store.has(11));
1772                assert!(!store.has(13));
1773                assert!(!store.has(14));
1774                assert!(!store.has(16));
1775                assert!(!store.has(17));
1776                assert!(!store.has(19));
1777
1778                // All indices from sections 0 and 2 should not be available
1779                for i in 0..10 {
1780                    assert!(!store.has(i));
1781                }
1782                for i in 20..30 {
1783                    assert!(!store.has(i));
1784                }
1785
1786                // Verify the available values
1787                assert_eq!(
1788                    store.get(12).await.unwrap().unwrap(),
1789                    FixedBytes::new([12u8; 32])
1790                );
1791                assert_eq!(
1792                    store.get(15).await.unwrap().unwrap(),
1793                    FixedBytes::new([15u8; 32])
1794                );
1795                assert_eq!(
1796                    store.get(18).await.unwrap().unwrap(),
1797                    FixedBytes::new([18u8; 32])
1798                );
1799            }
1800        });
1801    }
1802
1803    #[test_traced]
1804    fn test_init_with_bits_none_option_all_records_exist() {
1805        // Initialize the deterministic context
1806        let executor = deterministic::Runner::default();
1807        executor.start(|context| async move {
1808            let cfg = Config {
1809                partition: "test-ordinal".into(),
1810                items_per_blob: NZU64!(5),
1811                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1812                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1813            };
1814
1815            // Create store with all records in a section
1816            {
1817                let mut store =
1818                    Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
1819                        .await
1820                        .expect("Failed to initialize store");
1821
1822                // Fill section 1 completely (indices 5-9)
1823                for i in 5..10 {
1824                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1825                }
1826
1827                store.sync().await.unwrap();
1828            }
1829
1830            // Reinitialize with None option for section 1 (expects all records)
1831            {
1832                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1833                let none_option: Option<BitMap> = None;
1834                bits_map.insert(1, &none_option);
1835
1836                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1837                    context.child("second"),
1838                    cfg.clone(),
1839                    Some(bits_map),
1840                )
1841                .await
1842                .expect("Failed to initialize store with bits");
1843
1844                // All records in section 1 should be available
1845                for i in 5..10 {
1846                    assert!(store.has(i));
1847                    assert_eq!(
1848                        store.get(i).await.unwrap().unwrap(),
1849                        FixedBytes::new([i as u8; 32])
1850                    );
1851                }
1852            }
1853        });
1854    }
1855
1856    #[test_traced]
1857    #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(6)")]
1858    fn test_init_with_bits_none_option_missing_record_panics() {
1859        // Initialize the deterministic context
1860        let executor = deterministic::Runner::default();
1861        executor.start(|context| async move {
1862            let cfg = Config {
1863                partition: "test-ordinal".into(),
1864                items_per_blob: NZU64!(5),
1865                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1866                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1867            };
1868
1869            // Create store with missing record in a section
1870            {
1871                let mut store =
1872                    Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
1873                        .await
1874                        .expect("Failed to initialize store");
1875
1876                // Fill section 1 partially (skip index 6)
1877                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1878                // Skip index 6
1879                store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1880                store.put(8, FixedBytes::new([8u8; 32])).await.unwrap();
1881                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1882
1883                store.sync().await.unwrap();
1884            }
1885
1886            // Reinitialize with None option for section 1 (expects all records)
1887            // This should panic because index 6 is missing
1888            {
1889                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1890                let none_option: Option<BitMap> = None;
1891                bits_map.insert(1, &none_option);
1892
1893                let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1894                    context.child("second"),
1895                    cfg.clone(),
1896                    Some(bits_map),
1897                )
1898                .await
1899                .expect("Failed to initialize store with bits");
1900            }
1901        });
1902    }
1903
1904    #[test_traced]
1905    fn test_init_with_bits_mixed_sections() {
1906        // Initialize the deterministic context
1907        let executor = deterministic::Runner::default();
1908        executor.start(|context| async move {
1909            let cfg = Config {
1910                partition: "test-ordinal".into(),
1911                items_per_blob: NZU64!(5),
1912                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1913                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1914            };
1915
1916            // Create store with data in multiple sections
1917            {
1918                let mut store =
1919                    Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
1920                        .await
1921                        .expect("Failed to initialize store");
1922
1923                // Section 0: indices 0-4 (fill completely)
1924                for i in 0..5 {
1925                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1926                }
1927
1928                // Section 1: indices 5-9 (fill partially)
1929                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1930                store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1931                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1932
1933                // Section 2: indices 10-14 (fill completely)
1934                for i in 10..15 {
1935                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1936                }
1937
1938                store.sync().await.unwrap();
1939            }
1940
1941            // Reinitialize with mixed bits configuration
1942            {
1943                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1944
1945                // Section 0: None option (expects all records)
1946                let none_option: Option<BitMap> = None;
1947                bits_map.insert(0, &none_option);
1948
1949                // Section 1: BitMap with specific indices
1950                let mut bitmap1 = BitMap::zeroes(5);
1951                bitmap1.set(0, true); // Index 5
1952                bitmap1.set(2, true); // Index 7
1953                                      // Note: not setting bit for index 9, so it should be ignored
1954                let bitmap1_option = Some(bitmap1);
1955                bits_map.insert(1, &bitmap1_option);
1956
1957                // Section 2: Not in map, should be skipped entirely
1958
1959                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1960                    context.child("second"),
1961                    cfg.clone(),
1962                    Some(bits_map),
1963                )
1964                .await
1965                .expect("Failed to initialize store with bits");
1966
1967                // All records from section 0 should be available
1968                for i in 0..5 {
1969                    assert!(store.has(i));
1970                    assert_eq!(
1971                        store.get(i).await.unwrap().unwrap(),
1972                        FixedBytes::new([i as u8; 32])
1973                    );
1974                }
1975
1976                // Only specified records from section 1 should be available
1977                assert!(store.has(5));
1978                assert!(store.has(7));
1979                assert!(!store.has(6));
1980                assert!(!store.has(8));
1981                assert!(!store.has(9)); // Not set in bitmap
1982
1983                // No records from section 2 should be available
1984                for i in 10..15 {
1985                    assert!(!store.has(i));
1986                }
1987            }
1988        });
1989    }
1990
1991    #[test_traced]
1992    #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(2)")]
1993    fn test_init_with_bits_corrupted_records() {
1994        // Initialize the deterministic context
1995        let executor = deterministic::Runner::default();
1996        executor.start(|context| async move {
1997            let cfg = Config {
1998                partition: "test-ordinal".into(),
1999                items_per_blob: NZU64!(5),
2000                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
2001                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
2002            };
2003
2004            // Create store with data and corrupt one record
2005            {
2006                let mut store =
2007                    Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
2008                        .await
2009                        .expect("Failed to initialize store");
2010
2011                // Section 0: indices 0-4
2012                for i in 0..5 {
2013                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
2014                }
2015
2016                store.sync().await.unwrap();
2017            }
2018
2019            // Corrupt record at index 2
2020            {
2021                let (blob, _) = context
2022                    .open("test-ordinal", &0u64.to_be_bytes())
2023                    .await
2024                    .unwrap();
2025                // Corrupt the CRC of record at index 2
2026                let offset = 2 * 36 + 32; // 2 * record_size + value_size
2027                blob.write_at_sync(offset, vec![0xFF]).await.unwrap();
2028            }
2029
2030            // Reinitialize with bits that include the corrupted record
2031            {
2032                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
2033
2034                // Create a BitMap that includes the corrupted record
2035                let mut bitmap = BitMap::zeroes(5);
2036                bitmap.set(0, true); // Index 0
2037                bitmap.set(2, true); // Index 2 (corrupted) - this will cause a panic
2038                bitmap.set(4, true); // Index 4
2039                let bitmap_option = Some(bitmap);
2040                bits_map.insert(0, &bitmap_option);
2041
2042                let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
2043                    context.child("second"),
2044                    cfg.clone(),
2045                    Some(bits_map),
2046                )
2047                .await
2048                .expect("Failed to initialize store with bits");
2049            }
2050        });
2051    }
2052
2053    /// A dummy value that will fail parsing if the value is 0.
2054    #[derive(Debug, PartialEq, Eq)]
2055    pub struct DummyValue {
2056        pub value: u64,
2057    }
2058
2059    impl Write for DummyValue {
2060        fn write(&self, buf: &mut impl BufMut) {
2061            self.value.write(buf);
2062        }
2063    }
2064
2065    impl Read for DummyValue {
2066        type Cfg = ();
2067
2068        fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
2069            let value = u64::read(buf)?;
2070            if value == 0 {
2071                return Err(commonware_codec::Error::Invalid(
2072                    "DummyValue",
2073                    "value must be non-zero",
2074                ));
2075            }
2076            Ok(Self { value })
2077        }
2078    }
2079
2080    impl FixedSize for DummyValue {
2081        const SIZE: usize = u64::SIZE;
2082    }
2083
2084    #[test_traced]
2085    fn test_init_skip_unparseable_record() {
2086        // Initialize the deterministic context
2087        let executor = deterministic::Runner::default();
2088        executor.start(|context| async move {
2089            let cfg = Config {
2090                partition: "test-ordinal".into(),
2091                items_per_blob: NZU64!(1),
2092                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
2093                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
2094            };
2095
2096            // Create store with valid records
2097            {
2098                let mut store = Ordinal::<_, DummyValue>::init(context.child("first"), cfg.clone())
2099                    .await
2100                    .expect("Failed to initialize store");
2101
2102                // Add records at indices 1, 2, 4
2103                store.put(1, DummyValue { value: 1 }).await.unwrap();
2104                store.put(2, DummyValue { value: 0 }).await.unwrap(); // will fail parsing
2105                store.put(4, DummyValue { value: 4 }).await.unwrap();
2106
2107                store.sync().await.unwrap();
2108            }
2109
2110            // Reinitialize - should skip the unparseable record but continue processing
2111            {
2112                let store = Ordinal::<_, DummyValue>::init(context.child("second"), cfg.clone())
2113                    .await
2114                    .expect("Failed to initialize store");
2115
2116                // Record 0 should be available
2117                assert!(store.has(1), "Record 1 should be available");
2118                assert_eq!(
2119                    store.get(1).await.unwrap().unwrap(),
2120                    DummyValue { value: 1 },
2121                    "Record 0 should have correct value"
2122                );
2123
2124                // Record 2 should NOT be available (unparseable)
2125                assert!(
2126                    !store.has(2),
2127                    "Record 2 should not be available (unparseable)"
2128                );
2129
2130                // This tests that we didn't exit early when encountering the unparseable record
2131                assert!(
2132                    store.has(4),
2133                    "Record 4 should be available - we should not exit early on unparseable record"
2134                );
2135                assert_eq!(
2136                    store.get(4).await.unwrap().unwrap(),
2137                    DummyValue { value: 4 },
2138                    "Record 4 should have correct value"
2139                );
2140            }
2141        });
2142    }
2143}