Skip to main content

commonware_storage/ordinal/
mod.rs

1//! A persistent index that maps sparse indices to [commonware_utils::Array]s.
2//!
3//! [Ordinal] is a collection of [commonware_runtime::Blob]s containing ordered records of fixed size.
4//! Because records are fixed size, file position corresponds directly to index. Unlike
5//! [crate::journal::contiguous::fixed::Journal], [Ordinal] supports out-of-order insertion.
6//!
7//! # Design
8//!
9//! [Ordinal] is a collection of [commonware_runtime::Blob]s where:
10//! - Each record: `[V][crc32(V)]` where V is an [commonware_utils::Array]
11//! - Index N is at file offset: `N * RECORD_SIZE`
12//! - A [crate::rmap::RMap] tracks which indices have been written (and which are missing)
13//!
14//! # File Organization
15//!
16//! Records are grouped into blobs to avoid having too many files:
17//!
18//! ```text
19//! Blob 0: indices 0-999
20//! Blob 1: indices 1000-1999
21//! ...
22//! ```
23//!
24//! # Format
25//!
26//! [Ordinal] stores values in the following format:
27//!
28//! ```text
29//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
30//! | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |
31//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
32//! |          Value (Fixed Size)       |     CRC32     |
33//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
34//! ```
35//!
36//! # Performance Characteristics
37//!
38//! - **Writes**: O(1) - direct offset calculation
39//! - **Reads**: O(1) - direct offset calculation
40//! - **Has**: O(1) - in-memory lookup (via [crate::rmap::RMap])
41//! - **Next Gap**: O(log n) - in-memory range query (via [crate::rmap::RMap])
42//! - **Restart**: O(n) where n is the number of existing records (to rebuild [crate::rmap::RMap])
43//!
44//! # Atomicity
45//!
46//! [Ordinal] eagerly writes all new data to [commonware_runtime::Blob]s. New data, however, is not
47//! synced until [Ordinal::sync] is called. As a result, data is not guaranteed to be atomically
48//! persisted (i.e. shutdown before [Ordinal::sync] may lead to some writes being lost).
49//!
50//! _If you want atomicity for sparse writes, pair [commonware_utils::bitmap::BitMap] and
51//! [crate::metadata::Metadata] with [Ordinal] (use bits to indicate which items have been atomically
52//! written)._
53//!
54//! # Recovery
55//!
56//! On restart, [Ordinal] validates all records using their CRC32 and rebuilds the in-memory
57//! [crate::rmap::RMap]. Invalid records (corrupted or empty) are excluded from the rebuilt index.
58//!
59//! # Example
60//!
61//! ```rust
62//! use commonware_runtime::{Spawner, Runner, deterministic};
63//! use commonware_storage::ordinal::{Ordinal, Config};
64//! use commonware_utils::{sequence::FixedBytes, NZUsize, NZU64};
65//!
66//! let executor = deterministic::Runner::default();
67//! executor.start(|context| async move {
68//!     // Create a store for 32-byte values
69//!     let cfg = Config {
70//!         partition: "ordinal-store".into(),
71//!         items_per_blob: NZU64!(10000),
72//!         write_buffer: NZUsize!(4096),
73//!         replay_buffer: NZUsize!(1024 * 1024),
74//!     };
75//!     let mut store = Ordinal::<_, FixedBytes<32>>::init(context, cfg).await.unwrap();
76//!
77//!     // Put values at specific indices
78//!     let value1 = FixedBytes::new([1u8; 32]);
79//!     let value2 = FixedBytes::new([2u8; 32]);
80//!     store.put(0, value1).await.unwrap();
81//!     store.put(5, value2).await.unwrap();
82//!
83//!     // Sync to disk
84//!     store.sync().await.unwrap();
85//!
86//!     // Check for gaps
87//!     let (current_end, next_start) = store.next_gap(0);
88//!     assert_eq!(current_end, Some(0));
89//!     assert_eq!(next_start, Some(5));
90//!
91//!     // Sync the store
92//!     store.sync().await.unwrap();
93//! });
94//! ```
95
96mod storage;
97
98use std::num::{NonZeroU64, NonZeroUsize};
99pub use storage::Ordinal;
100use thiserror::Error;
101
102/// Errors that can occur when interacting with the [Ordinal].
103#[derive(Debug, Error)]
104pub enum Error {
105    #[error("runtime error: {0}")]
106    Runtime(#[from] commonware_runtime::Error),
107    #[error("codec error: {0}")]
108    Codec(#[from] commonware_codec::Error),
109    #[error("invalid blob name: {0}")]
110    InvalidBlobName(String),
111    #[error("invalid record: {0}")]
112    InvalidRecord(u64),
113    #[error("missing record at {0}")]
114    MissingRecord(u64),
115}
116
117/// Configuration for [Ordinal] storage.
118#[derive(Clone)]
119pub struct Config {
120    /// The [commonware_runtime::Storage] partition to use for storing the index.
121    pub partition: String,
122
123    /// The maximum number of items to store in each index blob.
124    pub items_per_blob: NonZeroU64,
125
126    /// The size of the write buffer to use when writing to the index.
127    pub write_buffer: NonZeroUsize,
128
129    /// The size of the read buffer to use on restart.
130    pub replay_buffer: NonZeroUsize,
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use commonware_codec::{FixedSize, Read, ReadExt, Write};
137    use commonware_cryptography::Crc32;
138    use commonware_macros::{test_group, test_traced};
139    use commonware_runtime::{deterministic, Blob, Buf, BufMut, Metrics, Runner, Storage};
140    use commonware_utils::{bitmap::BitMap, hex, sequence::FixedBytes, NZUsize, NZU64};
141    use rand::RngCore;
142    use std::collections::BTreeMap;
143
144    const DEFAULT_ITEMS_PER_BLOB: u64 = 1000;
145    const DEFAULT_WRITE_BUFFER: usize = 4096;
146    const DEFAULT_REPLAY_BUFFER: usize = 1024 * 1024;
147
148    #[test_traced]
149    fn test_put_get() {
150        // Initialize the deterministic context
151        let executor = deterministic::Runner::default();
152        executor.start(|context| async move {
153            // Initialize the store
154            let cfg = Config {
155                partition: "test-ordinal".into(),
156                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
157                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
158                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
159            };
160            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
161                .await
162                .expect("Failed to initialize store");
163
164            let value = FixedBytes::new([42u8; 32]);
165
166            // Check index doesn't exist
167            assert!(!store.has(0));
168
169            // Put the value at index 0
170            store
171                .put(0, value.clone())
172                .await
173                .expect("Failed to put data");
174
175            // Check index exists
176            assert!(store.has(0));
177
178            // Get the value back (before sync)
179            let retrieved = store
180                .get(0)
181                .await
182                .expect("Failed to get data")
183                .expect("Data not found");
184            assert_eq!(retrieved, value);
185
186            // Force a sync
187            store.sync().await.expect("Failed to sync data");
188
189            // Check metrics
190            let buffer = context.encode();
191            assert!(buffer.contains("gets_total 1"), "{}", buffer);
192            assert!(buffer.contains("puts_total 1"), "{}", buffer);
193            assert!(buffer.contains("has_total 2"), "{}", buffer);
194            assert!(buffer.contains("syncs_total 1"), "{}", buffer);
195            assert!(buffer.contains("pruned_total 0"), "{}", buffer);
196
197            // Get the value back (after sync)
198            let retrieved = store
199                .get(0)
200                .await
201                .expect("Failed to get data")
202                .expect("Data not found");
203            assert_eq!(retrieved, value);
204        });
205    }
206
207    #[test_traced]
208    fn test_concurrent_sync_does_not_report_success_while_flush_fails() {
209        let executor = deterministic::Runner::default();
210        executor.start(|context| async move {
211            let cfg = Config {
212                partition: "test-ordinal".into(),
213                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
214                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
215                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
216            };
217            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
218                .await
219                .expect("Failed to initialize store");
220
221            store
222                .put(0, FixedBytes::new([42u8; 32]))
223                .await
224                .expect("Failed to put data");
225
226            // Force flush failure by removing the underlying blob before sync.
227            let section = 0u64.to_be_bytes();
228            context
229                .remove(&cfg.partition, Some(&section))
230                .await
231                .expect("Failed to remove blob");
232
233            // Both concurrent sync calls must observe the in-flight durability failure.
234            let (first, second) = futures::future::join(store.sync(), store.sync()).await;
235            assert!(first.is_err(), "first sync unexpectedly succeeded");
236            assert!(second.is_err(), "second sync unexpectedly succeeded");
237        });
238    }
239
240    #[test_traced]
241    fn test_multiple_indices() {
242        // Initialize the deterministic context
243        let executor = deterministic::Runner::default();
244        executor.start(|context| async move {
245            // Initialize the store
246            let cfg = Config {
247                partition: "test-ordinal".into(),
248                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
249                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
250                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
251            };
252            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
253                .await
254                .expect("Failed to initialize store");
255
256            // Insert multiple values at different indices
257            let indices = vec![
258                (0u64, FixedBytes::new([0u8; 32])),
259                (5u64, FixedBytes::new([5u8; 32])),
260                (10u64, FixedBytes::new([10u8; 32])),
261                (100u64, FixedBytes::new([100u8; 32])),
262                (1000u64, FixedBytes::new([200u8; 32])), // Different blob
263            ];
264
265            for (index, value) in &indices {
266                store
267                    .put(*index, value.clone())
268                    .await
269                    .expect("Failed to put data");
270            }
271
272            // Sync to disk
273            store.sync().await.expect("Failed to sync");
274
275            // Retrieve all values and verify
276            for (index, value) in &indices {
277                let retrieved = store
278                    .get(*index)
279                    .await
280                    .expect("Failed to get data")
281                    .expect("Data not found");
282                assert_eq!(&retrieved, value);
283            }
284        });
285    }
286
287    #[test_traced]
288    fn test_sparse_indices() {
289        // Initialize the deterministic context
290        let executor = deterministic::Runner::default();
291        executor.start(|context| async move {
292            // Initialize the store
293            let cfg = Config {
294                partition: "test-ordinal".into(),
295                items_per_blob: NZU64!(100), // Smaller blobs for testing
296                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
297                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
298            };
299            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
300                .await
301                .expect("Failed to initialize store");
302
303            // Insert sparse values
304            let indices = vec![
305                (0u64, FixedBytes::new([0u8; 32])),
306                (99u64, FixedBytes::new([99u8; 32])), // End of first blob
307                (100u64, FixedBytes::new([100u8; 32])), // Start of second blob
308                (500u64, FixedBytes::new([200u8; 32])), // Start of sixth blob
309            ];
310
311            for (index, value) in &indices {
312                store
313                    .put(*index, value.clone())
314                    .await
315                    .expect("Failed to put data");
316            }
317
318            // Check that intermediate indices don't exist
319            assert!(!store.has(1));
320            assert!(!store.has(50));
321            assert!(!store.has(101));
322            assert!(!store.has(499));
323
324            // Sync and verify
325            store.sync().await.expect("Failed to sync");
326
327            for (index, value) in &indices {
328                let retrieved = store
329                    .get(*index)
330                    .await
331                    .expect("Failed to get data")
332                    .expect("Data not found");
333                assert_eq!(&retrieved, value);
334            }
335        });
336    }
337
338    #[test_traced]
339    fn test_next_gap() {
340        // Initialize the deterministic context
341        let executor = deterministic::Runner::default();
342        executor.start(|context| async move {
343            // Initialize the store
344            let cfg = Config {
345                partition: "test-ordinal".into(),
346                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
347                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
348                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
349            };
350            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
351                .await
352                .expect("Failed to initialize store");
353
354            // Insert values with gaps
355            store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
356            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
357            store.put(11, FixedBytes::new([11u8; 32])).await.unwrap();
358            store.put(14, FixedBytes::new([14u8; 32])).await.unwrap();
359
360            // Check gaps
361            let (current_end, start_next) = store.next_gap(0);
362            assert!(current_end.is_none());
363            assert_eq!(start_next, Some(1));
364
365            let (current_end, start_next) = store.next_gap(1);
366            assert_eq!(current_end, Some(1));
367            assert_eq!(start_next, Some(10));
368
369            let (current_end, start_next) = store.next_gap(10);
370            assert_eq!(current_end, Some(11));
371            assert_eq!(start_next, Some(14));
372
373            let (current_end, start_next) = store.next_gap(11);
374            assert_eq!(current_end, Some(11));
375            assert_eq!(start_next, Some(14));
376
377            let (current_end, start_next) = store.next_gap(12);
378            assert!(current_end.is_none());
379            assert_eq!(start_next, Some(14));
380
381            let (current_end, start_next) = store.next_gap(14);
382            assert_eq!(current_end, Some(14));
383            assert!(start_next.is_none());
384        });
385    }
386
387    #[test_traced]
388    fn test_missing_items() {
389        // Initialize the deterministic context
390        let executor = deterministic::Runner::default();
391        executor.start(|context| async move {
392            // Initialize the store
393            let cfg = Config {
394                partition: "test-ordinal".into(),
395                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
396                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
397                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
398            };
399            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
400                .await
401                .expect("Failed to initialize store");
402
403            // Test 1: Empty store - should return no items
404            assert_eq!(store.missing_items(0, 5), Vec::<u64>::new());
405            assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
406
407            // Test 2: Insert values with gaps
408            store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
409            store.put(2, FixedBytes::new([2u8; 32])).await.unwrap();
410            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
411            store.put(6, FixedBytes::new([6u8; 32])).await.unwrap();
412            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
413
414            // Test 3: Find missing items from the beginning
415            assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
416            assert_eq!(store.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
417            assert_eq!(store.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
418
419            // Test 4: Find missing items from within a gap
420            assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
421            assert_eq!(store.missing_items(4, 2), vec![4, 7]);
422
423            // Test 5: Find missing items from within a range
424            assert_eq!(store.missing_items(1, 3), vec![3, 4, 7]);
425            assert_eq!(store.missing_items(2, 4), vec![3, 4, 7, 8]);
426            assert_eq!(store.missing_items(5, 2), vec![7, 8]);
427
428            // Test 6: Find missing items after the last range (no more gaps)
429            assert_eq!(store.missing_items(11, 5), Vec::<u64>::new());
430            assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
431
432            // Test 7: Large gap scenario
433            store.put(1000, FixedBytes::new([100u8; 32])).await.unwrap();
434
435            // Gap between 10 and 1000
436            let items = store.missing_items(11, 10);
437            assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
438
439            // Request more items than available in gap
440            let items = store.missing_items(990, 15);
441            assert_eq!(
442                items,
443                vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
444            );
445
446            // Test 8: After syncing (data should remain consistent)
447            store.sync().await.unwrap();
448            assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
449            assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
450
451            // Test 9: Cross-blob boundary scenario
452            store.put(9999, FixedBytes::new([99u8; 32])).await.unwrap();
453            store
454                .put(10001, FixedBytes::new([101u8; 32]))
455                .await
456                .unwrap();
457
458            // Find missing items across blob boundary (10000 is the boundary)
459            let items = store.missing_items(9998, 5);
460            assert_eq!(items, vec![9998, 10000]);
461        });
462    }
463
464    #[test_traced]
465    fn test_restart() {
466        // Initialize the deterministic context
467        let executor = deterministic::Runner::default();
468        executor.start(|context| async move {
469            let cfg = Config {
470                partition: "test-ordinal".into(),
471                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
472                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
473                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
474            };
475
476            // Insert data and close
477            {
478                let mut store =
479                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
480                        .await
481                        .expect("Failed to initialize store");
482
483                let values = vec![
484                    (0u64, FixedBytes::new([0u8; 32])),
485                    (100u64, FixedBytes::new([100u8; 32])),
486                    (1000u64, FixedBytes::new([200u8; 32])),
487                ];
488
489                for (index, value) in &values {
490                    store
491                        .put(*index, value.clone())
492                        .await
493                        .expect("Failed to put data");
494                }
495
496                store.sync().await.expect("Failed to sync store");
497            }
498
499            // Reopen and verify data persisted
500            {
501                let store =
502                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
503                        .await
504                        .expect("Failed to initialize store");
505
506                let values = vec![
507                    (0u64, FixedBytes::new([0u8; 32])),
508                    (100u64, FixedBytes::new([100u8; 32])),
509                    (1000u64, FixedBytes::new([200u8; 32])),
510                ];
511
512                for (index, value) in &values {
513                    let retrieved = store
514                        .get(*index)
515                        .await
516                        .expect("Failed to get data")
517                        .expect("Data not found");
518                    assert_eq!(&retrieved, value);
519                }
520
521                // Check gaps are preserved
522                let (current_end, start_next) = store.next_gap(0);
523                assert_eq!(current_end, Some(0));
524                assert_eq!(start_next, Some(100));
525            }
526        });
527    }
528
529    #[test_traced]
530    fn test_invalid_record() {
531        // Initialize the deterministic context
532        let executor = deterministic::Runner::default();
533        executor.start(|context| async move {
534            let cfg = Config {
535                partition: "test-ordinal".into(),
536                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
537                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
538                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
539            };
540
541            // Create store with data
542            {
543                let mut store =
544                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
545                        .await
546                        .expect("Failed to initialize store");
547
548                store
549                    .put(0, FixedBytes::new([42u8; 32]))
550                    .await
551                    .expect("Failed to put data");
552                store.sync().await.expect("Failed to sync store");
553            }
554
555            // Corrupt the data
556            {
557                let (blob, _) = context
558                    .open("test-ordinal", &0u64.to_be_bytes())
559                    .await
560                    .unwrap();
561                // Corrupt the CRC by changing a byte
562                blob.write_at(32, vec![0xFF]).await.unwrap();
563                blob.sync().await.unwrap();
564            }
565
566            // Reopen and try to read corrupted data
567            {
568                let store =
569                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
570                        .await
571                        .expect("Failed to initialize store");
572
573                // Reading corrupt record will return empty
574                let result = store.get(0).await.unwrap();
575                assert!(result.is_none());
576
577                // The index should not be in the intervals after restart with corrupted data
578                assert!(!store.has(0));
579            }
580        });
581    }
582
583    #[test_traced]
584    fn test_get_nonexistent() {
585        // Initialize the deterministic context
586        let executor = deterministic::Runner::default();
587        executor.start(|context| async move {
588            // Initialize the store
589            let cfg = Config {
590                partition: "test-ordinal".into(),
591                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
592                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
593                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
594            };
595            let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
596                .await
597                .expect("Failed to initialize store");
598
599            // Attempt to get an index that doesn't exist
600            let retrieved = store.get(999).await.expect("Failed to get data");
601            assert!(retrieved.is_none());
602
603            // Check has returns false
604            assert!(!store.has(999));
605        });
606    }
607
608    #[test_traced]
609    fn test_destroy() {
610        // Initialize the deterministic context
611        let executor = deterministic::Runner::default();
612        executor.start(|context| async move {
613            let cfg = Config {
614                partition: "test-ordinal".into(),
615                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
616                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
617                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
618            };
619
620            // Create store with data
621            {
622                let mut store =
623                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
624                        .await
625                        .expect("Failed to initialize store");
626
627                store
628                    .put(0, FixedBytes::new([0u8; 32]))
629                    .await
630                    .expect("Failed to put data");
631                store
632                    .put(1000, FixedBytes::new([100u8; 32]))
633                    .await
634                    .expect("Failed to put data");
635
636                // Destroy the store
637                store.destroy().await.expect("Failed to destroy store");
638            }
639
640            // Try to create a new store - it should be empty
641            {
642                let store =
643                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
644                        .await
645                        .expect("Failed to initialize store");
646
647                // Should not find any data
648                assert!(store.get(0).await.unwrap().is_none());
649                assert!(store.get(1000).await.unwrap().is_none());
650                assert!(!store.has(0));
651                assert!(!store.has(1000));
652            }
653        });
654    }
655
656    #[test_traced]
657    fn test_partial_record_write() {
658        // Initialize the deterministic context
659        let executor = deterministic::Runner::default();
660        executor.start(|context| async move {
661            let cfg = Config {
662                partition: "test-ordinal".into(),
663                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
664                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
665                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
666            };
667
668            // Create store with data
669            {
670                let mut store =
671                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
672                        .await
673                        .expect("Failed to initialize store");
674
675                store
676                    .put(0, FixedBytes::new([42u8; 32]))
677                    .await
678                    .expect("Failed to put data");
679                store
680                    .put(1, FixedBytes::new([43u8; 32]))
681                    .await
682                    .expect("Failed to put data");
683                store.sync().await.expect("Failed to sync store");
684            }
685
686            // Corrupt by writing partial record (only value, no CRC)
687            {
688                let (blob, _) = context
689                    .open("test-ordinal", &0u64.to_be_bytes())
690                    .await
691                    .unwrap();
692                // Overwrite second record with partial data (32 bytes instead of 36)
693                blob.write_at(36, vec![0xFF; 32]).await.unwrap();
694                blob.sync().await.unwrap();
695            }
696
697            // Reopen and verify it handles partial write gracefully
698            {
699                let store =
700                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
701                        .await
702                        .expect("Failed to initialize store");
703
704                // First record should be fine
705                assert_eq!(
706                    store.get(0).await.unwrap().unwrap(),
707                    FixedBytes::new([42u8; 32])
708                );
709
710                // Second record should be removed due to partial write
711                assert!(!store.has(1));
712                assert!(store.get(1).await.unwrap().is_none());
713
714                // Store should still be functional
715                let mut store_mut = store;
716                store_mut.put(1, FixedBytes::new([44u8; 32])).await.unwrap();
717                assert_eq!(
718                    store_mut.get(1).await.unwrap().unwrap(),
719                    FixedBytes::new([44u8; 32])
720                );
721            }
722        });
723    }
724
725    #[test_traced]
726    fn test_corrupted_value() {
727        // Initialize the deterministic context
728        let executor = deterministic::Runner::default();
729        executor.start(|context| async move {
730            let cfg = Config {
731                partition: "test-ordinal".into(),
732                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
733                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
734                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
735            };
736
737            // Create store with data
738            {
739                let mut store =
740                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
741                        .await
742                        .expect("Failed to initialize store");
743
744                store
745                    .put(0, FixedBytes::new([42u8; 32]))
746                    .await
747                    .expect("Failed to put data");
748                store
749                    .put(1, FixedBytes::new([43u8; 32]))
750                    .await
751                    .expect("Failed to put data");
752                store.sync().await.expect("Failed to sync store");
753            }
754
755            // Corrupt the value portion of a record
756            {
757                let (blob, _) = context
758                    .open("test-ordinal", &0u64.to_be_bytes())
759                    .await
760                    .unwrap();
761                // Corrupt some bytes in the value of the first record
762                blob.write_at(10, hex!("0xFFFFFFFF").to_vec())
763                    .await
764                    .unwrap();
765                blob.sync().await.unwrap();
766            }
767
768            // Reopen and verify it detects corruption
769            {
770                let store =
771                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
772                        .await
773                        .expect("Failed to initialize store");
774
775                // First record should be detected as corrupted (CRC mismatch)
776                assert!(!store.has(0));
777
778                // Second record should still be valid
779                assert!(store.has(1));
780                assert_eq!(
781                    store.get(1).await.unwrap().unwrap(),
782                    FixedBytes::new([43u8; 32])
783                );
784            }
785        });
786    }
787
788    #[test_traced]
789    fn test_crc_corruptions() {
790        // Initialize the deterministic context
791        let executor = deterministic::Runner::default();
792        executor.start(|context| async move {
793            let cfg = Config {
794                partition: "test-ordinal".into(),
795                items_per_blob: NZU64!(10), // Small blob size for testing
796                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
797                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
798            };
799
800            // Create store with data across multiple blobs
801            {
802                let mut store =
803                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
804                        .await
805                        .expect("Failed to initialize store");
806
807                // Add values across 2 blobs
808                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
809                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
810                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
811                store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
812                store.sync().await.expect("Failed to sync store");
813            }
814
815            // Corrupt CRCs in different blobs
816            {
817                // Corrupt CRC in first blob
818                let (blob, _) = context
819                    .open("test-ordinal", &0u64.to_be_bytes())
820                    .await
821                    .unwrap();
822                blob.write_at(32, vec![0xFF]).await.unwrap(); // Corrupt CRC of index 0
823                blob.sync().await.unwrap();
824
825                // Corrupt value in second blob (which will invalidate CRC)
826                let (blob, _) = context
827                    .open("test-ordinal", &1u64.to_be_bytes())
828                    .await
829                    .unwrap();
830                blob.write_at(5, vec![0xFF; 4]).await.unwrap(); // Corrupt value of index 10
831                blob.sync().await.unwrap();
832            }
833
834            // Reopen and verify handling of CRC corruptions
835            {
836                let store =
837                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
838                        .await
839                        .expect("Failed to initialize store");
840
841                // Corrupted records should not be present
842                assert!(!store.has(0)); // CRC corrupted
843                assert!(!store.has(10)); // Value corrupted (CRC mismatch)
844
845                // Valid records should still be accessible
846                assert!(store.has(5));
847                assert!(store.has(15));
848                assert_eq!(
849                    store.get(5).await.unwrap().unwrap(),
850                    FixedBytes::new([5u8; 32])
851                );
852                assert_eq!(
853                    store.get(15).await.unwrap().unwrap(),
854                    FixedBytes::new([15u8; 32])
855                );
856            }
857        });
858    }
859
860    #[test_traced]
861    fn test_extra_bytes_in_blob() {
862        // Initialize the deterministic context
863        let executor = deterministic::Runner::default();
864        executor.start(|context| async move {
865            let cfg = Config {
866                partition: "test-ordinal".into(),
867                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
868                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
869                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
870            };
871
872            // Create store with data
873            {
874                let mut store =
875                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
876                        .await
877                        .expect("Failed to initialize store");
878
879                store
880                    .put(0, FixedBytes::new([42u8; 32]))
881                    .await
882                    .expect("Failed to put data");
883                store
884                    .put(1, FixedBytes::new([43u8; 32]))
885                    .await
886                    .expect("Failed to put data");
887                store.sync().await.expect("Failed to sync store");
888            }
889
890            // Add extra bytes at the end of blob
891            {
892                let (blob, size) = context
893                    .open("test-ordinal", &0u64.to_be_bytes())
894                    .await
895                    .unwrap();
896                // Add garbage data that forms a complete but invalid record
897                // This avoids partial record issues
898                let mut garbage = vec![0xFF; 32]; // Invalid value
899                let invalid_crc = 0xDEADBEEFu32;
900                garbage.extend_from_slice(&invalid_crc.to_be_bytes());
901                assert_eq!(garbage.len(), 36); // Full record size
902                blob.write_at(size, garbage).await.unwrap();
903                blob.sync().await.unwrap();
904            }
905
906            // Reopen and verify it handles extra bytes
907            {
908                let store =
909                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
910                        .await
911                        .expect("Failed to initialize store");
912
913                // Original records should still be valid
914                assert!(store.has(0));
915                assert!(store.has(1));
916                assert_eq!(
917                    store.get(0).await.unwrap().unwrap(),
918                    FixedBytes::new([42u8; 32])
919                );
920                assert_eq!(
921                    store.get(1).await.unwrap().unwrap(),
922                    FixedBytes::new([43u8; 32])
923                );
924
925                // Store should still be functional
926                let mut store_mut = store;
927                store_mut.put(2, FixedBytes::new([44u8; 32])).await.unwrap();
928                assert_eq!(
929                    store_mut.get(2).await.unwrap().unwrap(),
930                    FixedBytes::new([44u8; 32])
931                );
932            }
933        });
934    }
935
936    #[test_traced]
937    fn test_zero_filled_records() {
938        // Initialize the deterministic context
939        let executor = deterministic::Runner::default();
940        executor.start(|context| async move {
941            let cfg = Config {
942                partition: "test-ordinal".into(),
943                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
944                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
945                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
946            };
947
948            // Create blob with zero-filled space
949            {
950                let (blob, _) = context
951                    .open("test-ordinal", &0u64.to_be_bytes())
952                    .await
953                    .unwrap();
954
955                // Write zeros for several record positions
956                let zeros = vec![0u8; 36 * 5]; // 5 records worth of zeros
957                blob.write_at(0, zeros).await.unwrap();
958
959                // Write a valid record after the zeros
960                let mut valid_record = vec![44u8; 32];
961                let crc = Crc32::checksum(&valid_record);
962                valid_record.extend_from_slice(&crc.to_be_bytes());
963                blob.write_at(36 * 5, valid_record).await.unwrap();
964
965                blob.sync().await.unwrap();
966            }
967
968            // Initialize store and verify it handles zero-filled records
969            {
970                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
971                    .await
972                    .expect("Failed to initialize store");
973
974                // Zero-filled positions should not be considered valid
975                for i in 0..5 {
976                    assert!(!store.has(i));
977                }
978
979                // The valid record should be found
980                assert!(store.has(5));
981                assert_eq!(
982                    store.get(5).await.unwrap().unwrap(),
983                    FixedBytes::new([44u8; 32])
984                );
985            }
986        });
987    }
988
989    fn test_operations_and_restart(num_values: usize) -> String {
990        // Initialize the deterministic context
991        let executor = deterministic::Runner::default();
992        executor.start(|mut context| async move {
993            let cfg = Config {
994                partition: "test-ordinal".into(),
995                items_per_blob: NZU64!(100), // Smaller blobs to test multiple blob handling
996                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
997                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
998            };
999
1000            // Initialize the store
1001            let mut store =
1002                Ordinal::<_, FixedBytes<128>>::init(context.with_label("first"), cfg.clone())
1003                    .await
1004                    .expect("Failed to initialize store");
1005
1006            // Generate and insert random values at various indices
1007            let mut values = Vec::new();
1008            let mut rng_index = 0u64;
1009
1010            for _ in 0..num_values {
1011                // Generate a pseudo-random index (sparse to test gaps)
1012                let mut index_bytes = [0u8; 8];
1013                context.fill_bytes(&mut index_bytes);
1014                let index_offset = u64::from_be_bytes(index_bytes) % 1000;
1015                let index = rng_index + index_offset;
1016                rng_index = index + 1;
1017
1018                // Generate random value
1019                let mut value = [0u8; 128];
1020                context.fill_bytes(&mut value);
1021                let value = FixedBytes::<128>::new(value);
1022
1023                store
1024                    .put(index, value.clone())
1025                    .await
1026                    .expect("Failed to put data");
1027                values.push((index, value));
1028            }
1029
1030            // Sync data
1031            store.sync().await.expect("Failed to sync");
1032
1033            // Verify all values can be retrieved
1034            for (index, value) in &values {
1035                let retrieved = store
1036                    .get(*index)
1037                    .await
1038                    .expect("Failed to get data")
1039                    .expect("Data not found");
1040                assert_eq!(&retrieved, value);
1041            }
1042
1043            // Test next_gap on various indices
1044            for i in 0..10 {
1045                let _ = store.next_gap(i * 100);
1046            }
1047
1048            // Sync and drop the store
1049            store.sync().await.expect("Failed to sync store");
1050            drop(store);
1051
1052            // Reopen the store
1053            let mut store = Ordinal::<_, FixedBytes<128>>::init(context.with_label("second"), cfg)
1054                .await
1055                .expect("Failed to initialize store");
1056
1057            // Verify all values are still there after restart
1058            for (index, value) in &values {
1059                let retrieved = store
1060                    .get(*index)
1061                    .await
1062                    .expect("Failed to get data")
1063                    .expect("Data not found");
1064                assert_eq!(&retrieved, value);
1065            }
1066
1067            // Add more values after restart
1068            for _ in 0..10 {
1069                let mut index_bytes = [0u8; 8];
1070                context.fill_bytes(&mut index_bytes);
1071                let index = u64::from_be_bytes(index_bytes) % 10000;
1072
1073                let mut value = [0u8; 128];
1074                context.fill_bytes(&mut value);
1075                let value = FixedBytes::<128>::new(value);
1076
1077                store.put(index, value).await.expect("Failed to put data");
1078            }
1079
1080            // Final sync
1081            store.sync().await.expect("Failed to sync");
1082
1083            // Return the auditor state for comparison
1084            context.auditor().state()
1085        })
1086    }
1087
1088    #[test_group("slow")]
1089    #[test_traced]
1090    fn test_determinism() {
1091        let state1 = test_operations_and_restart(100);
1092        let state2 = test_operations_and_restart(100);
1093        assert_eq!(state1, state2);
1094    }
1095
1096    #[test_traced]
1097    fn test_prune_basic() {
1098        // Initialize the deterministic context
1099        let executor = deterministic::Runner::default();
1100        executor.start(|context| async move {
1101            let cfg = Config {
1102                partition: "test-ordinal".into(),
1103                items_per_blob: NZU64!(100), // Small blobs to test multiple blob handling
1104                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1105                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1106            };
1107
1108            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1109                .await
1110                .expect("Failed to initialize store");
1111
1112            // Insert data across multiple blobs
1113            let values = vec![
1114                (0u64, FixedBytes::new([0u8; 32])),     // Blob 0
1115                (50u64, FixedBytes::new([50u8; 32])),   // Blob 0
1116                (100u64, FixedBytes::new([100u8; 32])), // Blob 1
1117                (150u64, FixedBytes::new([150u8; 32])), // Blob 1
1118                (200u64, FixedBytes::new([200u8; 32])), // Blob 2
1119                (300u64, FixedBytes::new([44u8; 32])),  // Blob 3
1120            ];
1121
1122            for (index, value) in &values {
1123                store
1124                    .put(*index, value.clone())
1125                    .await
1126                    .expect("Failed to put data");
1127            }
1128            store.sync().await.unwrap();
1129
1130            // Verify all values exist
1131            for (index, value) in &values {
1132                assert_eq!(store.get(*index).await.unwrap().unwrap(), *value);
1133            }
1134
1135            // Prune up to index 150 (should remove blob 0 only)
1136            store.prune(150).await.unwrap();
1137            let buffer = context.encode();
1138            assert!(buffer.contains("pruned_total 1"));
1139
1140            // Verify pruned data is gone
1141            assert!(!store.has(0));
1142            assert!(!store.has(50));
1143            assert!(store.get(0).await.unwrap().is_none());
1144            assert!(store.get(50).await.unwrap().is_none());
1145
1146            // Verify remaining data is still there
1147            assert!(store.has(100));
1148            assert!(store.has(150));
1149            assert!(store.has(200));
1150            assert!(store.has(300));
1151            assert_eq!(store.get(100).await.unwrap().unwrap(), values[2].1);
1152            assert_eq!(store.get(150).await.unwrap().unwrap(), values[3].1);
1153            assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1154            assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1155
1156            // Prune more aggressively - up to index 250 (should remove blob 1)
1157            store.prune(250).await.unwrap();
1158            let buffer = context.encode();
1159            assert!(buffer.contains("pruned_total 2"));
1160
1161            // Verify more data is pruned
1162            assert!(!store.has(100));
1163            assert!(!store.has(150));
1164            assert!(store.get(100).await.unwrap().is_none());
1165            assert!(store.get(150).await.unwrap().is_none());
1166
1167            // Verify remaining data
1168            assert!(store.has(200));
1169            assert!(store.has(300));
1170            assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1171            assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1172        });
1173    }
1174
1175    #[test_traced]
1176    fn test_prune_with_gaps() {
1177        // Initialize the deterministic context
1178        let executor = deterministic::Runner::default();
1179        executor.start(|context| async move {
1180            let cfg = Config {
1181                partition: "test-ordinal".into(),
1182                items_per_blob: NZU64!(100),
1183                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1184                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1185            };
1186
1187            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1188                .await
1189                .expect("Failed to initialize store");
1190
1191            // Insert sparse data with gaps
1192            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1193            store.put(105, FixedBytes::new([105u8; 32])).await.unwrap();
1194            store.put(305, FixedBytes::new([49u8; 32])).await.unwrap();
1195            store.sync().await.unwrap();
1196
1197            // Check gaps before pruning
1198            let (current_end, next_start) = store.next_gap(0);
1199            assert!(current_end.is_none());
1200            assert_eq!(next_start, Some(5));
1201
1202            let (current_end, next_start) = store.next_gap(5);
1203            assert_eq!(current_end, Some(5));
1204            assert_eq!(next_start, Some(105));
1205
1206            // Prune up to index 150 (should remove blob 0)
1207            store.prune(150).await.unwrap();
1208
1209            // Verify pruned data is gone
1210            assert!(!store.has(5));
1211            assert!(store.get(5).await.unwrap().is_none());
1212
1213            // Verify remaining data and gaps
1214            assert!(store.has(105));
1215            assert!(store.has(305));
1216
1217            let (current_end, next_start) = store.next_gap(0);
1218            assert!(current_end.is_none());
1219            assert_eq!(next_start, Some(105));
1220
1221            let (current_end, next_start) = store.next_gap(105);
1222            assert_eq!(current_end, Some(105));
1223            assert_eq!(next_start, Some(305));
1224        });
1225    }
1226
1227    #[test_traced]
1228    fn test_prune_no_op() {
1229        // Initialize the deterministic context
1230        let executor = deterministic::Runner::default();
1231        executor.start(|context| async move {
1232            let cfg = Config {
1233                partition: "test-ordinal".into(),
1234                items_per_blob: NZU64!(100),
1235                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1236                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1237            };
1238
1239            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1240                .await
1241                .expect("Failed to initialize store");
1242
1243            // Insert data
1244            store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1245            store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1246            store.sync().await.unwrap();
1247
1248            // Try to prune before any data - should be no-op
1249            store.prune(50).await.unwrap();
1250
1251            // Verify no data was actually pruned
1252            assert!(store.has(100));
1253            assert!(store.has(200));
1254            let buffer = context.encode();
1255            assert!(buffer.contains("pruned_total 0"));
1256
1257            // Try to prune exactly at blob boundary - should be no-op
1258            store.prune(100).await.unwrap();
1259
1260            // Verify still no data pruned
1261            assert!(store.has(100));
1262            assert!(store.has(200));
1263            let buffer = context.encode();
1264            assert!(buffer.contains("pruned_total 0"));
1265        });
1266    }
1267
1268    #[test_traced]
1269    fn test_prune_empty_store() {
1270        // Initialize the deterministic context
1271        let executor = deterministic::Runner::default();
1272        executor.start(|context| async move {
1273            let cfg = Config {
1274                partition: "test-ordinal".into(),
1275                items_per_blob: NZU64!(100),
1276                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1277                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1278            };
1279
1280            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1281                .await
1282                .expect("Failed to initialize store");
1283
1284            // Try to prune empty store
1285            store.prune(1000).await.unwrap();
1286
1287            // Store should still be functional
1288            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1289            assert!(store.has(0));
1290        });
1291    }
1292
1293    #[test_traced]
1294    fn test_prune_after_restart() {
1295        // Initialize the deterministic context
1296        let executor = deterministic::Runner::default();
1297        executor.start(|context| async move {
1298            let cfg = Config {
1299                partition: "test-ordinal".into(),
1300                items_per_blob: NZU64!(100),
1301                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1302                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1303            };
1304
1305            // Create store and add data
1306            {
1307                let mut store =
1308                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1309                        .await
1310                        .expect("Failed to initialize store");
1311
1312                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1313                store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1314                store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1315                store.sync().await.unwrap();
1316            }
1317
1318            // Reopen and prune
1319            {
1320                let mut store =
1321                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
1322                        .await
1323                        .expect("Failed to initialize store");
1324
1325                // Verify data is there
1326                assert!(store.has(0));
1327                assert!(store.has(100));
1328                assert!(store.has(200));
1329
1330                // Prune up to index 150
1331                store.prune(150).await.unwrap();
1332
1333                // Verify pruning worked
1334                assert!(!store.has(0));
1335                assert!(store.has(100));
1336                assert!(store.has(200));
1337
1338                store.sync().await.unwrap();
1339            }
1340
1341            // Reopen again and verify pruning persisted
1342            {
1343                let store =
1344                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("third"), cfg.clone())
1345                        .await
1346                        .expect("Failed to initialize store");
1347
1348                assert!(!store.has(0));
1349                assert!(store.has(100));
1350                assert!(store.has(200));
1351
1352                // Check gaps
1353                let (current_end, next_start) = store.next_gap(0);
1354                assert!(current_end.is_none());
1355                assert_eq!(next_start, Some(100));
1356            }
1357        });
1358    }
1359
1360    #[test_traced]
1361    fn test_prune_multiple_operations() {
1362        // Initialize the deterministic context
1363        let executor = deterministic::Runner::default();
1364        executor.start(|context| async move {
1365            let cfg = Config {
1366                partition: "test-ordinal".into(),
1367                items_per_blob: NZU64!(50), // Smaller blobs for more granular testing
1368                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1369                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1370            };
1371
1372            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1373                .await
1374                .expect("Failed to initialize store");
1375
1376            // Insert data across many blobs
1377            let mut values = Vec::new();
1378            for i in 0..10 {
1379                let index = i * 50 + 25; // Middle of each blob
1380                let value = FixedBytes::new([i as u8; 32]);
1381                store.put(index, value.clone()).await.unwrap();
1382                values.push((index, value));
1383            }
1384            store.sync().await.unwrap();
1385
1386            // Prune incrementally
1387            for i in 1..5 {
1388                let prune_index = i * 50 + 10;
1389                store.prune(prune_index).await.unwrap();
1390
1391                // Verify appropriate data is pruned
1392                for (index, _) in &values {
1393                    if *index < prune_index {
1394                        assert!(!store.has(*index), "Index {index} should be pruned");
1395                    } else {
1396                        assert!(store.has(*index), "Index {index} should not be pruned");
1397                    }
1398                }
1399            }
1400
1401            // Check final state
1402            let buffer = context.encode();
1403            assert!(buffer.contains("pruned_total 4"));
1404
1405            // Verify remaining data
1406            for i in 4..10 {
1407                let index = i * 50 + 25;
1408                assert!(store.has(index));
1409                assert_eq!(
1410                    store.get(index).await.unwrap().unwrap(),
1411                    values[i as usize].1
1412                );
1413            }
1414        });
1415    }
1416
1417    #[test_traced]
1418    fn test_prune_blob_boundaries() {
1419        // Initialize the deterministic context
1420        let executor = deterministic::Runner::default();
1421        executor.start(|context| async move {
1422            let cfg = Config {
1423                partition: "test-ordinal".into(),
1424                items_per_blob: NZU64!(100),
1425                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1426                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1427            };
1428
1429            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1430                .await
1431                .expect("Failed to initialize store");
1432
1433            // Insert data at blob boundaries
1434            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); // Start of blob 0
1435            store.put(99, FixedBytes::new([99u8; 32])).await.unwrap(); // End of blob 0
1436            store.put(100, FixedBytes::new([100u8; 32])).await.unwrap(); // Start of blob 1
1437            store.put(199, FixedBytes::new([199u8; 32])).await.unwrap(); // End of blob 1
1438            store.put(200, FixedBytes::new([200u8; 32])).await.unwrap(); // Start of blob 2
1439            store.sync().await.unwrap();
1440
1441            // Test various pruning points around boundaries
1442
1443            // Prune exactly at blob boundary (100) - should prune blob 0
1444            store.prune(100).await.unwrap();
1445            assert!(!store.has(0));
1446            assert!(!store.has(99));
1447            assert!(store.has(100));
1448            assert!(store.has(199));
1449            assert!(store.has(200));
1450
1451            // Prune just before next boundary (199) - should not prune blob 1
1452            store.prune(199).await.unwrap();
1453            assert!(store.has(100));
1454            assert!(store.has(199));
1455            assert!(store.has(200));
1456
1457            // Prune exactly at next boundary (200) - should prune blob 1
1458            store.prune(200).await.unwrap();
1459            assert!(!store.has(100));
1460            assert!(!store.has(199));
1461            assert!(store.has(200));
1462
1463            let buffer = context.encode();
1464            assert!(buffer.contains("pruned_total 2"));
1465        });
1466    }
1467
1468    #[test_traced]
1469    fn test_prune_non_contiguous_sections() {
1470        // Initialize the deterministic context
1471        let executor = deterministic::Runner::default();
1472        executor.start(|context| async move {
1473            let cfg = Config {
1474                partition: "test-ordinal".into(),
1475                items_per_blob: NZU64!(100),
1476                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1477                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1478            };
1479
1480            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1481                .await
1482                .expect("Failed to initialize store");
1483
1484            // Insert data in non-contiguous sections (0, 2, 5, 7)
1485            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); // Section 0
1486            store.put(250, FixedBytes::new([50u8; 32])).await.unwrap(); // Section 2 (250/100 = 2)
1487            store.put(500, FixedBytes::new([44u8; 32])).await.unwrap(); // Section 5 (500/100 = 5)
1488            store.put(750, FixedBytes::new([45u8; 32])).await.unwrap(); // Section 7 (750/100 = 7)
1489            store.sync().await.unwrap();
1490
1491            // Verify all data exists initially
1492            assert!(store.has(0));
1493            assert!(store.has(250));
1494            assert!(store.has(500));
1495            assert!(store.has(750));
1496
1497            // Prune up to section 3 (index 300) - should remove sections 0 and 2
1498            store.prune(300).await.unwrap();
1499
1500            // Verify correct data was pruned
1501            assert!(!store.has(0)); // Section 0 pruned
1502            assert!(!store.has(250)); // Section 2 pruned
1503            assert!(store.has(500)); // Section 5 remains
1504            assert!(store.has(750)); // Section 7 remains
1505
1506            let buffer = context.encode();
1507            assert!(buffer.contains("pruned_total 2"));
1508
1509            // Prune up to section 6 (index 600) - should remove section 5
1510            store.prune(600).await.unwrap();
1511
1512            // Verify section 5 was pruned
1513            assert!(!store.has(500)); // Section 5 pruned
1514            assert!(store.has(750)); // Section 7 remains
1515
1516            let buffer = context.encode();
1517            assert!(buffer.contains("pruned_total 3"));
1518
1519            // Prune everything - should remove section 7
1520            store.prune(1000).await.unwrap();
1521
1522            // Verify all data is gone
1523            assert!(!store.has(750)); // Section 7 pruned
1524
1525            let buffer = context.encode();
1526            assert!(buffer.contains("pruned_total 4"));
1527        });
1528    }
1529
1530    #[test_traced]
1531    fn test_prune_removes_correct_pending() {
1532        // Initialize the deterministic context
1533        let executor = deterministic::Runner::default();
1534        executor.start(|context| async move {
1535            let cfg = Config {
1536                partition: "test-ordinal".into(),
1537                items_per_blob: NZU64!(100),
1538                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1539                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1540            };
1541            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1542                .await
1543                .expect("Failed to initialize store");
1544
1545            // Insert and sync some data in blob 0
1546            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1547            store.sync().await.unwrap();
1548
1549            // Add pending entries to blob 0 and blob 1
1550            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap(); // blob 0
1551            store.put(110, FixedBytes::new([110u8; 32])).await.unwrap(); // blob 1
1552
1553            // Verify all data is visible before pruning
1554            assert!(store.has(5));
1555            assert!(store.has(10));
1556            assert!(store.has(110));
1557
1558            // Prune up to index 100, which should remove blob 0 (indices 0-99).
1559            store.prune(150).await.unwrap();
1560
1561            // Verify that synced and pending entries in blob 0 are removed.
1562            assert!(!store.has(5));
1563            assert!(!store.has(10));
1564
1565            // Verify that the pending entry in blob 1 remains.
1566            assert!(store.has(110));
1567            assert_eq!(
1568                store.get(110).await.unwrap().unwrap(),
1569                FixedBytes::new([110u8; 32])
1570            );
1571
1572            // Sync the remaining pending entry and verify it's still there.
1573            store.sync().await.unwrap();
1574            assert!(store.has(110));
1575            assert_eq!(
1576                store.get(110).await.unwrap().unwrap(),
1577                FixedBytes::new([110u8; 32])
1578            );
1579        });
1580    }
1581
1582    #[test_traced]
1583    fn test_init_with_bits_none() {
1584        // Initialize the deterministic context
1585        let executor = deterministic::Runner::default();
1586        executor.start(|context| async move {
1587            let cfg = Config {
1588                partition: "test-ordinal".into(),
1589                items_per_blob: NZU64!(10), // Small blob size for testing
1590                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1591                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1592            };
1593
1594            // Create store with data across multiple sections
1595            {
1596                let mut store =
1597                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1598                        .await
1599                        .expect("Failed to initialize store");
1600
1601                // Section 0 (indices 0-9)
1602                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1603                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1604                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1605
1606                // Section 1 (indices 10-19)
1607                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1608                store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
1609
1610                // Section 2 (indices 20-29)
1611                store.put(25, FixedBytes::new([25u8; 32])).await.unwrap();
1612
1613                store.sync().await.unwrap();
1614            }
1615
1616            // Reinitialize with bits = None (should behave like regular init)
1617            {
1618                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1619                    context.with_label("second"),
1620                    cfg.clone(),
1621                    None,
1622                )
1623                .await
1624                .expect("Failed to initialize store with bits");
1625
1626                // All records should be available
1627                assert!(store.has(0));
1628                assert!(store.has(5));
1629                assert!(store.has(9));
1630                assert!(store.has(10));
1631                assert!(store.has(15));
1632                assert!(store.has(25));
1633
1634                // Non-existent records should not be available
1635                assert!(!store.has(1));
1636                assert!(!store.has(11));
1637                assert!(!store.has(20));
1638
1639                // Verify values
1640                assert_eq!(
1641                    store.get(0).await.unwrap().unwrap(),
1642                    FixedBytes::new([0u8; 32])
1643                );
1644                assert_eq!(
1645                    store.get(15).await.unwrap().unwrap(),
1646                    FixedBytes::new([15u8; 32])
1647                );
1648            }
1649        });
1650    }
1651
1652    #[test_traced]
1653    fn test_init_with_bits_empty_hashmap() {
1654        // Initialize the deterministic context
1655        let executor = deterministic::Runner::default();
1656        executor.start(|context| async move {
1657            let cfg = Config {
1658                partition: "test-ordinal".into(),
1659                items_per_blob: NZU64!(10),
1660                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1661                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1662            };
1663
1664            // Create store with data
1665            {
1666                let mut store =
1667                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1668                        .await
1669                        .expect("Failed to initialize store");
1670
1671                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1672                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1673                store.put(20, FixedBytes::new([20u8; 32])).await.unwrap();
1674
1675                store.sync().await.unwrap();
1676            }
1677
1678            // Reinitialize with empty HashMap - should skip all sections
1679            {
1680                let bits: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1681                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1682                    context.with_label("second"),
1683                    cfg.clone(),
1684                    Some(bits),
1685                )
1686                .await
1687                .expect("Failed to initialize store with bits");
1688
1689                // No records should be available since no sections were in the bits map
1690                assert!(!store.has(0));
1691                assert!(!store.has(10));
1692                assert!(!store.has(20));
1693            }
1694        });
1695    }
1696
1697    #[test_traced]
1698    fn test_init_with_bits_selective_sections() {
1699        // Initialize the deterministic context
1700        let executor = deterministic::Runner::default();
1701        executor.start(|context| async move {
1702            let cfg = Config {
1703                partition: "test-ordinal".into(),
1704                items_per_blob: NZU64!(10),
1705                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1706                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1707            };
1708
1709            // Create store with data in multiple sections
1710            {
1711                let mut store =
1712                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1713                        .await
1714                        .expect("Failed to initialize store");
1715
1716                // Section 0 (indices 0-9)
1717                for i in 0..10 {
1718                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1719                }
1720
1721                // Section 1 (indices 10-19)
1722                for i in 10..20 {
1723                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1724                }
1725
1726                // Section 2 (indices 20-29)
1727                for i in 20..30 {
1728                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1729                }
1730
1731                store.sync().await.unwrap();
1732            }
1733
1734            // Reinitialize with bits for only section 1
1735            {
1736                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1737
1738                // Create a BitMap that marks indices 12, 15, and 18 as present
1739                let mut bitmap = BitMap::zeroes(10);
1740                bitmap.set(2, true); // Index 12 (offset 2 in section 1)
1741                bitmap.set(5, true); // Index 15 (offset 5 in section 1)
1742                bitmap.set(8, true); // Index 18 (offset 8 in section 1)
1743                let bitmap_option = Some(bitmap);
1744
1745                bits_map.insert(1, &bitmap_option);
1746
1747                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1748                    context.with_label("second"),
1749                    cfg.clone(),
1750                    Some(bits_map),
1751                )
1752                .await
1753                .expect("Failed to initialize store with bits");
1754
1755                // Only specified indices from section 1 should be available
1756                assert!(store.has(12));
1757                assert!(store.has(15));
1758                assert!(store.has(18));
1759
1760                // Other indices from section 1 should not be available
1761                assert!(!store.has(10));
1762                assert!(!store.has(11));
1763                assert!(!store.has(13));
1764                assert!(!store.has(14));
1765                assert!(!store.has(16));
1766                assert!(!store.has(17));
1767                assert!(!store.has(19));
1768
1769                // All indices from sections 0 and 2 should not be available
1770                for i in 0..10 {
1771                    assert!(!store.has(i));
1772                }
1773                for i in 20..30 {
1774                    assert!(!store.has(i));
1775                }
1776
1777                // Verify the available values
1778                assert_eq!(
1779                    store.get(12).await.unwrap().unwrap(),
1780                    FixedBytes::new([12u8; 32])
1781                );
1782                assert_eq!(
1783                    store.get(15).await.unwrap().unwrap(),
1784                    FixedBytes::new([15u8; 32])
1785                );
1786                assert_eq!(
1787                    store.get(18).await.unwrap().unwrap(),
1788                    FixedBytes::new([18u8; 32])
1789                );
1790            }
1791        });
1792    }
1793
1794    #[test_traced]
1795    fn test_init_with_bits_none_option_all_records_exist() {
1796        // Initialize the deterministic context
1797        let executor = deterministic::Runner::default();
1798        executor.start(|context| async move {
1799            let cfg = Config {
1800                partition: "test-ordinal".into(),
1801                items_per_blob: NZU64!(5),
1802                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1803                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1804            };
1805
1806            // Create store with all records in a section
1807            {
1808                let mut store =
1809                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1810                        .await
1811                        .expect("Failed to initialize store");
1812
1813                // Fill section 1 completely (indices 5-9)
1814                for i in 5..10 {
1815                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1816                }
1817
1818                store.sync().await.unwrap();
1819            }
1820
1821            // Reinitialize with None option for section 1 (expects all records)
1822            {
1823                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1824                let none_option: Option<BitMap> = None;
1825                bits_map.insert(1, &none_option);
1826
1827                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1828                    context.with_label("second"),
1829                    cfg.clone(),
1830                    Some(bits_map),
1831                )
1832                .await
1833                .expect("Failed to initialize store with bits");
1834
1835                // All records in section 1 should be available
1836                for i in 5..10 {
1837                    assert!(store.has(i));
1838                    assert_eq!(
1839                        store.get(i).await.unwrap().unwrap(),
1840                        FixedBytes::new([i as u8; 32])
1841                    );
1842                }
1843            }
1844        });
1845    }
1846
1847    #[test_traced]
1848    #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(6)")]
1849    fn test_init_with_bits_none_option_missing_record_panics() {
1850        // Initialize the deterministic context
1851        let executor = deterministic::Runner::default();
1852        executor.start(|context| async move {
1853            let cfg = Config {
1854                partition: "test-ordinal".into(),
1855                items_per_blob: NZU64!(5),
1856                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1857                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1858            };
1859
1860            // Create store with missing record in a section
1861            {
1862                let mut store =
1863                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1864                        .await
1865                        .expect("Failed to initialize store");
1866
1867                // Fill section 1 partially (skip index 6)
1868                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1869                // Skip index 6
1870                store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1871                store.put(8, FixedBytes::new([8u8; 32])).await.unwrap();
1872                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1873
1874                store.sync().await.unwrap();
1875            }
1876
1877            // Reinitialize with None option for section 1 (expects all records)
1878            // This should panic because index 6 is missing
1879            {
1880                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1881                let none_option: Option<BitMap> = None;
1882                bits_map.insert(1, &none_option);
1883
1884                let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1885                    context.with_label("second"),
1886                    cfg.clone(),
1887                    Some(bits_map),
1888                )
1889                .await
1890                .expect("Failed to initialize store with bits");
1891            }
1892        });
1893    }
1894
1895    #[test_traced]
1896    fn test_init_with_bits_mixed_sections() {
1897        // Initialize the deterministic context
1898        let executor = deterministic::Runner::default();
1899        executor.start(|context| async move {
1900            let cfg = Config {
1901                partition: "test-ordinal".into(),
1902                items_per_blob: NZU64!(5),
1903                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1904                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1905            };
1906
1907            // Create store with data in multiple sections
1908            {
1909                let mut store =
1910                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1911                        .await
1912                        .expect("Failed to initialize store");
1913
1914                // Section 0: indices 0-4 (fill completely)
1915                for i in 0..5 {
1916                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1917                }
1918
1919                // Section 1: indices 5-9 (fill partially)
1920                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1921                store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1922                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1923
1924                // Section 2: indices 10-14 (fill completely)
1925                for i in 10..15 {
1926                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1927                }
1928
1929                store.sync().await.unwrap();
1930            }
1931
1932            // Reinitialize with mixed bits configuration
1933            {
1934                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1935
1936                // Section 0: None option (expects all records)
1937                let none_option: Option<BitMap> = None;
1938                bits_map.insert(0, &none_option);
1939
1940                // Section 1: BitMap with specific indices
1941                let mut bitmap1 = BitMap::zeroes(5);
1942                bitmap1.set(0, true); // Index 5
1943                bitmap1.set(2, true); // Index 7
1944                                      // Note: not setting bit for index 9, so it should be ignored
1945                let bitmap1_option = Some(bitmap1);
1946                bits_map.insert(1, &bitmap1_option);
1947
1948                // Section 2: Not in map, should be skipped entirely
1949
1950                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1951                    context.with_label("second"),
1952                    cfg.clone(),
1953                    Some(bits_map),
1954                )
1955                .await
1956                .expect("Failed to initialize store with bits");
1957
1958                // All records from section 0 should be available
1959                for i in 0..5 {
1960                    assert!(store.has(i));
1961                    assert_eq!(
1962                        store.get(i).await.unwrap().unwrap(),
1963                        FixedBytes::new([i as u8; 32])
1964                    );
1965                }
1966
1967                // Only specified records from section 1 should be available
1968                assert!(store.has(5));
1969                assert!(store.has(7));
1970                assert!(!store.has(6));
1971                assert!(!store.has(8));
1972                assert!(!store.has(9)); // Not set in bitmap
1973
1974                // No records from section 2 should be available
1975                for i in 10..15 {
1976                    assert!(!store.has(i));
1977                }
1978            }
1979        });
1980    }
1981
1982    #[test_traced]
1983    #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(2)")]
1984    fn test_init_with_bits_corrupted_records() {
1985        // Initialize the deterministic context
1986        let executor = deterministic::Runner::default();
1987        executor.start(|context| async move {
1988            let cfg = Config {
1989                partition: "test-ordinal".into(),
1990                items_per_blob: NZU64!(5),
1991                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1992                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1993            };
1994
1995            // Create store with data and corrupt one record
1996            {
1997                let mut store =
1998                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1999                        .await
2000                        .expect("Failed to initialize store");
2001
2002                // Section 0: indices 0-4
2003                for i in 0..5 {
2004                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
2005                }
2006
2007                store.sync().await.unwrap();
2008            }
2009
2010            // Corrupt record at index 2
2011            {
2012                let (blob, _) = context
2013                    .open("test-ordinal", &0u64.to_be_bytes())
2014                    .await
2015                    .unwrap();
2016                // Corrupt the CRC of record at index 2
2017                let offset = 2 * 36 + 32; // 2 * record_size + value_size
2018                blob.write_at(offset, vec![0xFF]).await.unwrap();
2019                blob.sync().await.unwrap();
2020            }
2021
2022            // Reinitialize with bits that include the corrupted record
2023            {
2024                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
2025
2026                // Create a BitMap that includes the corrupted record
2027                let mut bitmap = BitMap::zeroes(5);
2028                bitmap.set(0, true); // Index 0
2029                bitmap.set(2, true); // Index 2 (corrupted) - this will cause a panic
2030                bitmap.set(4, true); // Index 4
2031                let bitmap_option = Some(bitmap);
2032                bits_map.insert(0, &bitmap_option);
2033
2034                let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
2035                    context.with_label("second"),
2036                    cfg.clone(),
2037                    Some(bits_map),
2038                )
2039                .await
2040                .expect("Failed to initialize store with bits");
2041            }
2042        });
2043    }
2044
2045    /// A dummy value that will fail parsing if the value is 0.
2046    #[derive(Debug, PartialEq, Eq)]
2047    pub struct DummyValue {
2048        pub value: u64,
2049    }
2050
2051    impl Write for DummyValue {
2052        fn write(&self, buf: &mut impl BufMut) {
2053            self.value.write(buf);
2054        }
2055    }
2056
2057    impl Read for DummyValue {
2058        type Cfg = ();
2059
2060        fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
2061            let value = u64::read(buf)?;
2062            if value == 0 {
2063                return Err(commonware_codec::Error::Invalid(
2064                    "DummyValue",
2065                    "value must be non-zero",
2066                ));
2067            }
2068            Ok(Self { value })
2069        }
2070    }
2071
2072    impl FixedSize for DummyValue {
2073        const SIZE: usize = u64::SIZE;
2074    }
2075
2076    #[test_traced]
2077    fn test_init_skip_unparseable_record() {
2078        // Initialize the deterministic context
2079        let executor = deterministic::Runner::default();
2080        executor.start(|context| async move {
2081            let cfg = Config {
2082                partition: "test-ordinal".into(),
2083                items_per_blob: NZU64!(1),
2084                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
2085                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
2086            };
2087
2088            // Create store with valid records
2089            {
2090                let mut store =
2091                    Ordinal::<_, DummyValue>::init(context.with_label("first"), cfg.clone())
2092                        .await
2093                        .expect("Failed to initialize store");
2094
2095                // Add records at indices 1, 2, 4
2096                store.put(1, DummyValue { value: 1 }).await.unwrap();
2097                store.put(2, DummyValue { value: 0 }).await.unwrap(); // will fail parsing
2098                store.put(4, DummyValue { value: 4 }).await.unwrap();
2099
2100                store.sync().await.unwrap();
2101            }
2102
2103            // Reinitialize - should skip the unparseable record but continue processing
2104            {
2105                let store =
2106                    Ordinal::<_, DummyValue>::init(context.with_label("second"), cfg.clone())
2107                        .await
2108                        .expect("Failed to initialize store");
2109
2110                // Record 0 should be available
2111                assert!(store.has(1), "Record 1 should be available");
2112                assert_eq!(
2113                    store.get(1).await.unwrap().unwrap(),
2114                    DummyValue { value: 1 },
2115                    "Record 0 should have correct value"
2116                );
2117
2118                // Record 2 should NOT be available (unparseable)
2119                assert!(
2120                    !store.has(2),
2121                    "Record 2 should not be available (unparseable)"
2122                );
2123
2124                // This tests that we didn't exit early when encountering the unparseable record
2125                assert!(
2126                    store.has(4),
2127                    "Record 4 should be available - we should not exit early on unparseable record"
2128                );
2129                assert_eq!(
2130                    store.get(4).await.unwrap().unwrap(),
2131                    DummyValue { value: 4 },
2132                    "Record 4 should have correct value"
2133                );
2134            }
2135        });
2136    }
2137}