commonware_storage/ordinal/
mod.rs

1//! A persistent index that maps sparse indices to [commonware_utils::Array]s.
2//!
3//! [Ordinal] is a collection of [commonware_runtime::Blob]s containing ordered records of fixed size.
4//! Because records are fixed size, file position corresponds directly to index. Unlike
5//! [crate::journal::fixed::Journal], [Ordinal] supports out-of-order insertion.
6//!
7//! # Design
8//!
9//! [Ordinal] is a collection of [commonware_runtime::Blob]s where:
10//! - Each record: `[V][crc32(V)]` where V is an [commonware_utils::Array]
11//! - Index N is at file offset: `N * RECORD_SIZE`
12//! - A [crate::rmap::RMap] tracks which indices have been written (and which are missing)
13//!
14//! # File Organization
15//!
16//! Records are grouped into blobs to avoid having too many files:
17//!
18//! ```text
19//! Blob 0: indices 0-999
20//! Blob 1: indices 1000-1999
21//! ...
22//! ```
23//!
24//! # Format
25//!
26//! [Ordinal] stores values in the following format:
27//!
28//! ```text
29//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
30//! | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |
31//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
32//! |          Value (Fixed Size)       |     CRC32     |
33//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
34//! ```
35//!
36//! # Performance Characteristics
37//!
38//! - **Writes**: O(1) - direct offset calculation
39//! - **Reads**: O(1) - direct offset calculation
40//! - **Has**: O(1) - in-memory lookup (via [crate::rmap::RMap])
41//! - **Next Gap**: O(log n) - in-memory range query (via [crate::rmap::RMap])
42//! - **Restart**: O(n) where n is the number of existing records (to rebuild [crate::rmap::RMap])
43//!
44//! # Atomicity
45//!
46//! [Ordinal] eagerly writes all new data to [commonware_runtime::Blob]s. New data, however, is not
47//! synced until [Ordinal::sync] is called. As a result, data is not guaranteed to be atomically
48//! persisted (i.e. shutdown before [Ordinal::sync] may lead to some writes being lost).
49//!
50//! _If you want atomicity for sparse writes, pair [commonware_utils::BitVec] and
51//! [crate::metadata::Metadata] with [Ordinal] (use bits to indicate which items have been atomically
52//! written)._
53//!
54//! # Recovery
55//!
56//! On restart, [Ordinal] validates all records using their CRC32 and rebuilds the in-memory
57//! [crate::rmap::RMap]. Invalid records (corrupted or empty) are excluded from the rebuilt index.
58//!
59//! # Example
60//!
61//! ```rust
62//! use commonware_runtime::{Spawner, Runner, deterministic};
63//! use commonware_storage::ordinal::{Ordinal, Config};
64//! use commonware_utils::{sequence::FixedBytes, NZUsize, NZU64};
65//!
66//! let executor = deterministic::Runner::default();
67//! executor.start(|context| async move {
68//!     // Create a store for 32-byte values
69//!     let cfg = Config {
70//!         partition: "ordinal_store".into(),
71//!         items_per_blob: NZU64!(10000),
72//!         write_buffer: NZUsize!(4096),
73//!         replay_buffer: NZUsize!(1024 * 1024),
74//!     };
75//!     let mut store = Ordinal::<_, FixedBytes<32>>::init(context, cfg).await.unwrap();
76//!
77//!     // Put values at specific indices
78//!     let value1 = FixedBytes::new([1u8; 32]);
79//!     let value2 = FixedBytes::new([2u8; 32]);
80//!     store.put(0, value1).await.unwrap();
81//!     store.put(5, value2).await.unwrap();
82//!
83//!     // Sync to disk
84//!     store.sync().await.unwrap();
85//!
86//!     // Check for gaps
87//!     let (current_end, next_start) = store.next_gap(0);
88//!     assert_eq!(current_end, Some(0));
89//!     assert_eq!(next_start, Some(5));
90//!
91//!     // Close the store
92//!     store.close().await.unwrap();
93//! });
94//! ```
95
96mod storage;
97
98use std::num::{NonZeroU64, NonZeroUsize};
99pub use storage::Ordinal;
100use thiserror::Error;
101
102/// Errors that can occur when interacting with the [Ordinal].
103#[derive(Debug, Error)]
104pub enum Error {
105    #[error("runtime error: {0}")]
106    Runtime(#[from] commonware_runtime::Error),
107    #[error("codec error: {0}")]
108    Codec(#[from] commonware_codec::Error),
109    #[error("invalid blob name: {0}")]
110    InvalidBlobName(String),
111    #[error("invalid record: {0}")]
112    InvalidRecord(u64),
113    #[error("missing record at {0}")]
114    MissingRecord(u64),
115}
116
117/// Configuration for [Ordinal] storage.
118#[derive(Clone)]
119pub struct Config {
120    /// The [commonware_runtime::Storage] partition to use for storing the index.
121    pub partition: String,
122
123    /// The maximum number of items to store in each index blob.
124    pub items_per_blob: NonZeroU64,
125
126    /// The size of the write buffer to use when writing to the index.
127    pub write_buffer: NonZeroUsize,
128
129    /// The size of the read buffer to use on restart.
130    pub replay_buffer: NonZeroUsize,
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use bytes::{Buf, BufMut};
137    use commonware_codec::{FixedSize, Read, ReadExt, Write};
138    use commonware_macros::test_traced;
139    use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
140    use commonware_utils::{sequence::FixedBytes, BitVec, NZUsize, NZU64};
141    use rand::RngCore;
142    use std::collections::BTreeMap;
143
144    const DEFAULT_ITEMS_PER_BLOB: u64 = 1000;
145    const DEFAULT_WRITE_BUFFER: usize = 4096;
146    const DEFAULT_REPLAY_BUFFER: usize = 1024 * 1024;
147
148    #[test_traced]
149    fn test_put_get() {
150        // Initialize the deterministic context
151        let executor = deterministic::Runner::default();
152        executor.start(|context| async move {
153            // Initialize the store
154            let cfg = Config {
155                partition: "test_ordinal".into(),
156                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
157                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
158                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
159            };
160            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
161                .await
162                .expect("Failed to initialize store");
163
164            let value = FixedBytes::new([42u8; 32]);
165
166            // Check index doesn't exist
167            assert!(!store.has(0));
168
169            // Put the value at index 0
170            store
171                .put(0, value.clone())
172                .await
173                .expect("Failed to put data");
174
175            // Check index exists
176            assert!(store.has(0));
177
178            // Get the value back (before sync)
179            let retrieved = store
180                .get(0)
181                .await
182                .expect("Failed to get data")
183                .expect("Data not found");
184            assert_eq!(retrieved, value);
185
186            // Force a sync
187            store.sync().await.expect("Failed to sync data");
188
189            // Check metrics
190            let buffer = context.encode();
191            assert!(buffer.contains("gets_total 1"), "{}", buffer);
192            assert!(buffer.contains("puts_total 1"), "{}", buffer);
193            assert!(buffer.contains("has_total 2"), "{}", buffer);
194            assert!(buffer.contains("syncs_total 1"), "{}", buffer);
195            assert!(buffer.contains("pruned_total 0"), "{}", buffer);
196
197            // Get the value back (after sync)
198            let retrieved = store
199                .get(0)
200                .await
201                .expect("Failed to get data")
202                .expect("Data not found");
203            assert_eq!(retrieved, value);
204        });
205    }
206
207    #[test_traced]
208    fn test_multiple_indices() {
209        // Initialize the deterministic context
210        let executor = deterministic::Runner::default();
211        executor.start(|context| async move {
212            // Initialize the store
213            let cfg = Config {
214                partition: "test_ordinal".into(),
215                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
216                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
217                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
218            };
219            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
220                .await
221                .expect("Failed to initialize store");
222
223            // Insert multiple values at different indices
224            let indices = vec![
225                (0u64, FixedBytes::new([0u8; 32])),
226                (5u64, FixedBytes::new([5u8; 32])),
227                (10u64, FixedBytes::new([10u8; 32])),
228                (100u64, FixedBytes::new([100u8; 32])),
229                (1000u64, FixedBytes::new([200u8; 32])), // Different blob
230            ];
231
232            for (index, value) in &indices {
233                store
234                    .put(*index, value.clone())
235                    .await
236                    .expect("Failed to put data");
237            }
238
239            // Sync to disk
240            store.sync().await.expect("Failed to sync");
241
242            // Retrieve all values and verify
243            for (index, value) in &indices {
244                let retrieved = store
245                    .get(*index)
246                    .await
247                    .expect("Failed to get data")
248                    .expect("Data not found");
249                assert_eq!(&retrieved, value);
250            }
251        });
252    }
253
254    #[test_traced]
255    fn test_sparse_indices() {
256        // Initialize the deterministic context
257        let executor = deterministic::Runner::default();
258        executor.start(|context| async move {
259            // Initialize the store
260            let cfg = Config {
261                partition: "test_ordinal".into(),
262                items_per_blob: NZU64!(100), // Smaller blobs for testing
263                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
264                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
265            };
266            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
267                .await
268                .expect("Failed to initialize store");
269
270            // Insert sparse values
271            let indices = vec![
272                (0u64, FixedBytes::new([0u8; 32])),
273                (99u64, FixedBytes::new([99u8; 32])), // End of first blob
274                (100u64, FixedBytes::new([100u8; 32])), // Start of second blob
275                (500u64, FixedBytes::new([200u8; 32])), // Start of sixth blob
276            ];
277
278            for (index, value) in &indices {
279                store
280                    .put(*index, value.clone())
281                    .await
282                    .expect("Failed to put data");
283            }
284
285            // Check that intermediate indices don't exist
286            assert!(!store.has(1));
287            assert!(!store.has(50));
288            assert!(!store.has(101));
289            assert!(!store.has(499));
290
291            // Sync and verify
292            store.sync().await.expect("Failed to sync");
293
294            for (index, value) in &indices {
295                let retrieved = store
296                    .get(*index)
297                    .await
298                    .expect("Failed to get data")
299                    .expect("Data not found");
300                assert_eq!(&retrieved, value);
301            }
302        });
303    }
304
305    #[test_traced]
306    fn test_next_gap() {
307        // Initialize the deterministic context
308        let executor = deterministic::Runner::default();
309        executor.start(|context| async move {
310            // Initialize the store
311            let cfg = Config {
312                partition: "test_ordinal".into(),
313                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
314                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
315                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
316            };
317            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
318                .await
319                .expect("Failed to initialize store");
320
321            // Insert values with gaps
322            store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
323            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
324            store.put(11, FixedBytes::new([11u8; 32])).await.unwrap();
325            store.put(14, FixedBytes::new([14u8; 32])).await.unwrap();
326
327            // Check gaps
328            let (current_end, start_next) = store.next_gap(0);
329            assert!(current_end.is_none());
330            assert_eq!(start_next, Some(1));
331
332            let (current_end, start_next) = store.next_gap(1);
333            assert_eq!(current_end, Some(1));
334            assert_eq!(start_next, Some(10));
335
336            let (current_end, start_next) = store.next_gap(10);
337            assert_eq!(current_end, Some(11));
338            assert_eq!(start_next, Some(14));
339
340            let (current_end, start_next) = store.next_gap(11);
341            assert_eq!(current_end, Some(11));
342            assert_eq!(start_next, Some(14));
343
344            let (current_end, start_next) = store.next_gap(12);
345            assert!(current_end.is_none());
346            assert_eq!(start_next, Some(14));
347
348            let (current_end, start_next) = store.next_gap(14);
349            assert_eq!(current_end, Some(14));
350            assert!(start_next.is_none());
351        });
352    }
353
354    #[test_traced]
355    fn test_missing_items() {
356        // Initialize the deterministic context
357        let executor = deterministic::Runner::default();
358        executor.start(|context| async move {
359            // Initialize the store
360            let cfg = Config {
361                partition: "test_ordinal".into(),
362                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
363                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
364                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
365            };
366            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
367                .await
368                .expect("Failed to initialize store");
369
370            // Test 1: Empty store - should return no items
371            assert_eq!(store.missing_items(0, 5), Vec::<u64>::new());
372            assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
373
374            // Test 2: Insert values with gaps
375            store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
376            store.put(2, FixedBytes::new([2u8; 32])).await.unwrap();
377            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
378            store.put(6, FixedBytes::new([6u8; 32])).await.unwrap();
379            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
380
381            // Test 3: Find missing items from the beginning
382            assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
383            assert_eq!(store.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
384            assert_eq!(store.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
385
386            // Test 4: Find missing items from within a gap
387            assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
388            assert_eq!(store.missing_items(4, 2), vec![4, 7]);
389
390            // Test 5: Find missing items from within a range
391            assert_eq!(store.missing_items(1, 3), vec![3, 4, 7]);
392            assert_eq!(store.missing_items(2, 4), vec![3, 4, 7, 8]);
393            assert_eq!(store.missing_items(5, 2), vec![7, 8]);
394
395            // Test 6: Find missing items after the last range (no more gaps)
396            assert_eq!(store.missing_items(11, 5), Vec::<u64>::new());
397            assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
398
399            // Test 7: Large gap scenario
400            store.put(1000, FixedBytes::new([100u8; 32])).await.unwrap();
401
402            // Gap between 10 and 1000
403            let items = store.missing_items(11, 10);
404            assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
405
406            // Request more items than available in gap
407            let items = store.missing_items(990, 15);
408            assert_eq!(
409                items,
410                vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
411            );
412
413            // Test 8: After syncing (data should remain consistent)
414            store.sync().await.unwrap();
415            assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
416            assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
417
418            // Test 9: Cross-blob boundary scenario
419            store.put(9999, FixedBytes::new([99u8; 32])).await.unwrap();
420            store
421                .put(10001, FixedBytes::new([101u8; 32]))
422                .await
423                .unwrap();
424
425            // Find missing items across blob boundary (10000 is the boundary)
426            let items = store.missing_items(9998, 5);
427            assert_eq!(items, vec![9998, 10000]);
428
429            store.close().await.expect("Failed to close store");
430        });
431    }
432
433    #[test_traced]
434    fn test_restart() {
435        // Initialize the deterministic context
436        let executor = deterministic::Runner::default();
437        executor.start(|context| async move {
438            let cfg = Config {
439                partition: "test_ordinal".into(),
440                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
441                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
442                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
443            };
444
445            // Insert data and close
446            {
447                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
448                    .await
449                    .expect("Failed to initialize store");
450
451                let values = vec![
452                    (0u64, FixedBytes::new([0u8; 32])),
453                    (100u64, FixedBytes::new([100u8; 32])),
454                    (1000u64, FixedBytes::new([200u8; 32])),
455                ];
456
457                for (index, value) in &values {
458                    store
459                        .put(*index, value.clone())
460                        .await
461                        .expect("Failed to put data");
462                }
463
464                store.close().await.expect("Failed to close store");
465            }
466
467            // Reopen and verify data persisted
468            {
469                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
470                    .await
471                    .expect("Failed to initialize store");
472
473                let values = vec![
474                    (0u64, FixedBytes::new([0u8; 32])),
475                    (100u64, FixedBytes::new([100u8; 32])),
476                    (1000u64, FixedBytes::new([200u8; 32])),
477                ];
478
479                for (index, value) in &values {
480                    let retrieved = store
481                        .get(*index)
482                        .await
483                        .expect("Failed to get data")
484                        .expect("Data not found");
485                    assert_eq!(&retrieved, value);
486                }
487
488                // Check gaps are preserved
489                let (current_end, start_next) = store.next_gap(0);
490                assert_eq!(current_end, Some(0));
491                assert_eq!(start_next, Some(100));
492            }
493        });
494    }
495
496    #[test_traced]
497    fn test_invalid_record() {
498        // Initialize the deterministic context
499        let executor = deterministic::Runner::default();
500        executor.start(|context| async move {
501            let cfg = Config {
502                partition: "test_ordinal".into(),
503                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
504                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
505                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
506            };
507
508            // Create store with data
509            {
510                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
511                    .await
512                    .expect("Failed to initialize store");
513
514                store
515                    .put(0, FixedBytes::new([42u8; 32]))
516                    .await
517                    .expect("Failed to put data");
518                store.close().await.expect("Failed to close store");
519            }
520
521            // Corrupt the data
522            {
523                let (blob, _) = context
524                    .open("test_ordinal", &0u64.to_be_bytes())
525                    .await
526                    .unwrap();
527                // Corrupt the CRC by changing a byte
528                blob.write_at(vec![0xFF], 32).await.unwrap();
529                blob.sync().await.unwrap();
530            }
531
532            // Reopen and try to read corrupted data
533            {
534                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
535                    .await
536                    .expect("Failed to initialize store");
537
538                // Reading corrupt record will return empty
539                let result = store.get(0).await.unwrap();
540                assert!(result.is_none());
541
542                // The index should not be in the intervals after restart with corrupted data
543                assert!(!store.has(0));
544            }
545        });
546    }
547
548    #[test_traced]
549    fn test_get_nonexistent() {
550        // Initialize the deterministic context
551        let executor = deterministic::Runner::default();
552        executor.start(|context| async move {
553            // Initialize the store
554            let cfg = Config {
555                partition: "test_ordinal".into(),
556                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
557                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
558                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
559            };
560            let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
561                .await
562                .expect("Failed to initialize store");
563
564            // Attempt to get an index that doesn't exist
565            let retrieved = store.get(999).await.expect("Failed to get data");
566            assert!(retrieved.is_none());
567
568            // Check has returns false
569            assert!(!store.has(999));
570        });
571    }
572
573    #[test_traced]
574    fn test_destroy() {
575        // Initialize the deterministic context
576        let executor = deterministic::Runner::default();
577        executor.start(|context| async move {
578            let cfg = Config {
579                partition: "test_ordinal".into(),
580                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
581                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
582                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
583            };
584
585            // Create store with data
586            {
587                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
588                    .await
589                    .expect("Failed to initialize store");
590
591                store
592                    .put(0, FixedBytes::new([0u8; 32]))
593                    .await
594                    .expect("Failed to put data");
595                store
596                    .put(1000, FixedBytes::new([100u8; 32]))
597                    .await
598                    .expect("Failed to put data");
599
600                // Destroy the store
601                store.destroy().await.expect("Failed to destroy store");
602            }
603
604            // Try to create a new store - it should be empty
605            {
606                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
607                    .await
608                    .expect("Failed to initialize store");
609
610                // Should not find any data
611                assert!(store.get(0).await.unwrap().is_none());
612                assert!(store.get(1000).await.unwrap().is_none());
613                assert!(!store.has(0));
614                assert!(!store.has(1000));
615            }
616        });
617    }
618
619    #[test_traced]
620    fn test_partial_record_write() {
621        // Initialize the deterministic context
622        let executor = deterministic::Runner::default();
623        executor.start(|context| async move {
624            let cfg = Config {
625                partition: "test_ordinal".into(),
626                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
627                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
628                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
629            };
630
631            // Create store with data
632            {
633                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
634                    .await
635                    .expect("Failed to initialize store");
636
637                store
638                    .put(0, FixedBytes::new([42u8; 32]))
639                    .await
640                    .expect("Failed to put data");
641                store
642                    .put(1, FixedBytes::new([43u8; 32]))
643                    .await
644                    .expect("Failed to put data");
645                store.close().await.expect("Failed to close store");
646            }
647
648            // Corrupt by writing partial record (only value, no CRC)
649            {
650                let (blob, _) = context
651                    .open("test_ordinal", &0u64.to_be_bytes())
652                    .await
653                    .unwrap();
654                // Overwrite second record with partial data (32 bytes instead of 36)
655                blob.write_at(vec![0xFF; 32], 36).await.unwrap();
656                blob.sync().await.unwrap();
657            }
658
659            // Reopen and verify it handles partial write gracefully
660            {
661                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
662                    .await
663                    .expect("Failed to initialize store");
664
665                // First record should be fine
666                assert_eq!(
667                    store.get(0).await.unwrap().unwrap(),
668                    FixedBytes::new([42u8; 32])
669                );
670
671                // Second record should be removed due to partial write
672                assert!(!store.has(1));
673                assert!(store.get(1).await.unwrap().is_none());
674
675                // Store should still be functional
676                let mut store_mut = store;
677                store_mut.put(1, FixedBytes::new([44u8; 32])).await.unwrap();
678                assert_eq!(
679                    store_mut.get(1).await.unwrap().unwrap(),
680                    FixedBytes::new([44u8; 32])
681                );
682            }
683        });
684    }
685
686    #[test_traced]
687    fn test_corrupted_value() {
688        // Initialize the deterministic context
689        let executor = deterministic::Runner::default();
690        executor.start(|context| async move {
691            let cfg = Config {
692                partition: "test_ordinal".into(),
693                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
694                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
695                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
696            };
697
698            // Create store with data
699            {
700                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
701                    .await
702                    .expect("Failed to initialize store");
703
704                store
705                    .put(0, FixedBytes::new([42u8; 32]))
706                    .await
707                    .expect("Failed to put data");
708                store
709                    .put(1, FixedBytes::new([43u8; 32]))
710                    .await
711                    .expect("Failed to put data");
712                store.close().await.expect("Failed to close store");
713            }
714
715            // Corrupt the value portion of a record
716            {
717                let (blob, _) = context
718                    .open("test_ordinal", &0u64.to_be_bytes())
719                    .await
720                    .unwrap();
721                // Corrupt some bytes in the value of the first record
722                blob.write_at(vec![0xFF, 0xFF, 0xFF, 0xFF], 10)
723                    .await
724                    .unwrap();
725                blob.sync().await.unwrap();
726            }
727
728            // Reopen and verify it detects corruption
729            {
730                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
731                    .await
732                    .expect("Failed to initialize store");
733
734                // First record should be detected as corrupted (CRC mismatch)
735                assert!(!store.has(0));
736
737                // Second record should still be valid
738                assert!(store.has(1));
739                assert_eq!(
740                    store.get(1).await.unwrap().unwrap(),
741                    FixedBytes::new([43u8; 32])
742                );
743            }
744        });
745    }
746
747    #[test_traced]
748    fn test_crc_corruptions() {
749        // Initialize the deterministic context
750        let executor = deterministic::Runner::default();
751        executor.start(|context| async move {
752            let cfg = Config {
753                partition: "test_ordinal".into(),
754                items_per_blob: NZU64!(10), // Small blob size for testing
755                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
756                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
757            };
758
759            // Create store with data across multiple blobs
760            {
761                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
762                    .await
763                    .expect("Failed to initialize store");
764
765                // Add values across 2 blobs
766                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
767                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
768                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
769                store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
770                store.close().await.expect("Failed to close store");
771            }
772
773            // Corrupt CRCs in different blobs
774            {
775                // Corrupt CRC in first blob
776                let (blob, _) = context
777                    .open("test_ordinal", &0u64.to_be_bytes())
778                    .await
779                    .unwrap();
780                blob.write_at(vec![0xFF], 32).await.unwrap(); // Corrupt CRC of index 0
781                blob.sync().await.unwrap();
782
783                // Corrupt value in second blob (which will invalidate CRC)
784                let (blob, _) = context
785                    .open("test_ordinal", &1u64.to_be_bytes())
786                    .await
787                    .unwrap();
788                blob.write_at(vec![0xFF; 4], 5).await.unwrap(); // Corrupt value of index 10
789                blob.sync().await.unwrap();
790            }
791
792            // Reopen and verify handling of CRC corruptions
793            {
794                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
795                    .await
796                    .expect("Failed to initialize store");
797
798                // Corrupted records should not be present
799                assert!(!store.has(0)); // CRC corrupted
800                assert!(!store.has(10)); // Value corrupted (CRC mismatch)
801
802                // Valid records should still be accessible
803                assert!(store.has(5));
804                assert!(store.has(15));
805                assert_eq!(
806                    store.get(5).await.unwrap().unwrap(),
807                    FixedBytes::new([5u8; 32])
808                );
809                assert_eq!(
810                    store.get(15).await.unwrap().unwrap(),
811                    FixedBytes::new([15u8; 32])
812                );
813            }
814        });
815    }
816
817    #[test_traced]
818    fn test_extra_bytes_in_blob() {
819        // Initialize the deterministic context
820        let executor = deterministic::Runner::default();
821        executor.start(|context| async move {
822            let cfg = Config {
823                partition: "test_ordinal".into(),
824                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
825                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
826                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
827            };
828
829            // Create store with data
830            {
831                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
832                    .await
833                    .expect("Failed to initialize store");
834
835                store
836                    .put(0, FixedBytes::new([42u8; 32]))
837                    .await
838                    .expect("Failed to put data");
839                store
840                    .put(1, FixedBytes::new([43u8; 32]))
841                    .await
842                    .expect("Failed to put data");
843                store.close().await.expect("Failed to close store");
844            }
845
846            // Add extra bytes at the end of blob
847            {
848                let (blob, size) = context
849                    .open("test_ordinal", &0u64.to_be_bytes())
850                    .await
851                    .unwrap();
852                // Add garbage data that forms a complete but invalid record
853                // This avoids partial record issues
854                let mut garbage = vec![0xFF; 32]; // Invalid value
855                let invalid_crc = 0xDEADBEEFu32;
856                garbage.extend_from_slice(&invalid_crc.to_be_bytes());
857                assert_eq!(garbage.len(), 36); // Full record size
858                blob.write_at(garbage, size).await.unwrap();
859                blob.sync().await.unwrap();
860            }
861
862            // Reopen and verify it handles extra bytes
863            {
864                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
865                    .await
866                    .expect("Failed to initialize store");
867
868                // Original records should still be valid
869                assert!(store.has(0));
870                assert!(store.has(1));
871                assert_eq!(
872                    store.get(0).await.unwrap().unwrap(),
873                    FixedBytes::new([42u8; 32])
874                );
875                assert_eq!(
876                    store.get(1).await.unwrap().unwrap(),
877                    FixedBytes::new([43u8; 32])
878                );
879
880                // Store should still be functional
881                let mut store_mut = store;
882                store_mut.put(2, FixedBytes::new([44u8; 32])).await.unwrap();
883                assert_eq!(
884                    store_mut.get(2).await.unwrap().unwrap(),
885                    FixedBytes::new([44u8; 32])
886                );
887            }
888        });
889    }
890
891    #[test_traced]
892    fn test_zero_filled_records() {
893        // Initialize the deterministic context
894        let executor = deterministic::Runner::default();
895        executor.start(|context| async move {
896            let cfg = Config {
897                partition: "test_ordinal".into(),
898                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
899                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
900                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
901            };
902
903            // Create blob with zero-filled space
904            {
905                let (blob, _) = context
906                    .open("test_ordinal", &0u64.to_be_bytes())
907                    .await
908                    .unwrap();
909
910                // Write zeros for several record positions
911                let zeros = vec![0u8; 36 * 5]; // 5 records worth of zeros
912                blob.write_at(zeros, 0).await.unwrap();
913
914                // Write a valid record after the zeros
915                let mut valid_record = vec![44u8; 32];
916                let crc = crc32fast::hash(&valid_record);
917                valid_record.extend_from_slice(&crc.to_be_bytes());
918                blob.write_at(valid_record, 36 * 5).await.unwrap();
919
920                blob.sync().await.unwrap();
921            }
922
923            // Initialize store and verify it handles zero-filled records
924            {
925                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
926                    .await
927                    .expect("Failed to initialize store");
928
929                // Zero-filled positions should not be considered valid
930                for i in 0..5 {
931                    assert!(!store.has(i));
932                }
933
934                // The valid record should be found
935                assert!(store.has(5));
936                assert_eq!(
937                    store.get(5).await.unwrap().unwrap(),
938                    FixedBytes::new([44u8; 32])
939                );
940            }
941        });
942    }
943
944    fn test_operations_and_restart(num_values: usize) -> String {
945        // Initialize the deterministic context
946        let executor = deterministic::Runner::default();
947        executor.start(|mut context| async move {
948            let cfg = Config {
949                partition: "test_ordinal".into(),
950                items_per_blob: NZU64!(100), // Smaller blobs to test multiple blob handling
951                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
952                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
953            };
954
955            // Initialize the store
956            let mut store = Ordinal::<_, FixedBytes<128>>::init(context.clone(), cfg.clone())
957                .await
958                .expect("Failed to initialize store");
959
960            // Generate and insert random values at various indices
961            let mut values = Vec::new();
962            let mut rng_index = 0u64;
963
964            for _ in 0..num_values {
965                // Generate a pseudo-random index (sparse to test gaps)
966                let mut index_bytes = [0u8; 8];
967                context.fill_bytes(&mut index_bytes);
968                let index_offset = u64::from_be_bytes(index_bytes) % 1000;
969                let index = rng_index + index_offset;
970                rng_index = index + 1;
971
972                // Generate random value
973                let mut value = [0u8; 128];
974                context.fill_bytes(&mut value);
975                let value = FixedBytes::<128>::new(value);
976
977                store
978                    .put(index, value.clone())
979                    .await
980                    .expect("Failed to put data");
981                values.push((index, value));
982            }
983
984            // Sync data
985            store.sync().await.expect("Failed to sync");
986
987            // Verify all values can be retrieved
988            for (index, value) in &values {
989                let retrieved = store
990                    .get(*index)
991                    .await
992                    .expect("Failed to get data")
993                    .expect("Data not found");
994                assert_eq!(&retrieved, value);
995            }
996
997            // Test next_gap on various indices
998            for i in 0..10 {
999                let _ = store.next_gap(i * 100);
1000            }
1001
1002            // Close the store
1003            store.close().await.expect("Failed to close store");
1004
1005            // Reopen the store
1006            let mut store = Ordinal::<_, FixedBytes<128>>::init(context.clone(), cfg)
1007                .await
1008                .expect("Failed to initialize store");
1009
1010            // Verify all values are still there after restart
1011            for (index, value) in &values {
1012                let retrieved = store
1013                    .get(*index)
1014                    .await
1015                    .expect("Failed to get data")
1016                    .expect("Data not found");
1017                assert_eq!(&retrieved, value);
1018            }
1019
1020            // Add more values after restart
1021            for _ in 0..10 {
1022                let mut index_bytes = [0u8; 8];
1023                context.fill_bytes(&mut index_bytes);
1024                let index = u64::from_be_bytes(index_bytes) % 10000;
1025
1026                let mut value = [0u8; 128];
1027                context.fill_bytes(&mut value);
1028                let value = FixedBytes::<128>::new(value);
1029
1030                store.put(index, value).await.expect("Failed to put data");
1031            }
1032
1033            // Final sync
1034            store.sync().await.expect("Failed to sync");
1035
1036            // Return the auditor state for comparison
1037            context.auditor().state()
1038        })
1039    }
1040
1041    #[test_traced]
1042    #[ignore]
1043    fn test_determinism() {
1044        let state1 = test_operations_and_restart(100);
1045        let state2 = test_operations_and_restart(100);
1046        assert_eq!(state1, state2);
1047    }
1048
1049    #[test_traced]
1050    fn test_prune_basic() {
1051        // Initialize the deterministic context
1052        let executor = deterministic::Runner::default();
1053        executor.start(|context| async move {
1054            let cfg = Config {
1055                partition: "test_ordinal".into(),
1056                items_per_blob: NZU64!(100), // Small blobs to test multiple blob handling
1057                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1058                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1059            };
1060
1061            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1062                .await
1063                .expect("Failed to initialize store");
1064
1065            // Insert data across multiple blobs
1066            let values = vec![
1067                (0u64, FixedBytes::new([0u8; 32])),     // Blob 0
1068                (50u64, FixedBytes::new([50u8; 32])),   // Blob 0
1069                (100u64, FixedBytes::new([100u8; 32])), // Blob 1
1070                (150u64, FixedBytes::new([150u8; 32])), // Blob 1
1071                (200u64, FixedBytes::new([200u8; 32])), // Blob 2
1072                (300u64, FixedBytes::new([44u8; 32])),  // Blob 3
1073            ];
1074
1075            for (index, value) in &values {
1076                store
1077                    .put(*index, value.clone())
1078                    .await
1079                    .expect("Failed to put data");
1080            }
1081            store.sync().await.unwrap();
1082
1083            // Verify all values exist
1084            for (index, value) in &values {
1085                assert_eq!(store.get(*index).await.unwrap().unwrap(), *value);
1086            }
1087
1088            // Prune up to index 150 (should remove blob 0 only)
1089            store.prune(150).await.unwrap();
1090            let buffer = context.encode();
1091            assert!(buffer.contains("pruned_total 1"));
1092
1093            // Verify pruned data is gone
1094            assert!(!store.has(0));
1095            assert!(!store.has(50));
1096            assert!(store.get(0).await.unwrap().is_none());
1097            assert!(store.get(50).await.unwrap().is_none());
1098
1099            // Verify remaining data is still there
1100            assert!(store.has(100));
1101            assert!(store.has(150));
1102            assert!(store.has(200));
1103            assert!(store.has(300));
1104            assert_eq!(store.get(100).await.unwrap().unwrap(), values[2].1);
1105            assert_eq!(store.get(150).await.unwrap().unwrap(), values[3].1);
1106            assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1107            assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1108
1109            // Prune more aggressively - up to index 250 (should remove blob 1)
1110            store.prune(250).await.unwrap();
1111            let buffer = context.encode();
1112            assert!(buffer.contains("pruned_total 2"));
1113
1114            // Verify more data is pruned
1115            assert!(!store.has(100));
1116            assert!(!store.has(150));
1117            assert!(store.get(100).await.unwrap().is_none());
1118            assert!(store.get(150).await.unwrap().is_none());
1119
1120            // Verify remaining data
1121            assert!(store.has(200));
1122            assert!(store.has(300));
1123            assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1124            assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1125        });
1126    }
1127
1128    #[test_traced]
1129    fn test_prune_with_gaps() {
1130        // Initialize the deterministic context
1131        let executor = deterministic::Runner::default();
1132        executor.start(|context| async move {
1133            let cfg = Config {
1134                partition: "test_ordinal".into(),
1135                items_per_blob: NZU64!(100),
1136                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1137                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1138            };
1139
1140            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1141                .await
1142                .expect("Failed to initialize store");
1143
1144            // Insert sparse data with gaps
1145            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1146            store.put(105, FixedBytes::new([105u8; 32])).await.unwrap();
1147            store.put(305, FixedBytes::new([49u8; 32])).await.unwrap();
1148            store.sync().await.unwrap();
1149
1150            // Check gaps before pruning
1151            let (current_end, next_start) = store.next_gap(0);
1152            assert!(current_end.is_none());
1153            assert_eq!(next_start, Some(5));
1154
1155            let (current_end, next_start) = store.next_gap(5);
1156            assert_eq!(current_end, Some(5));
1157            assert_eq!(next_start, Some(105));
1158
1159            // Prune up to index 150 (should remove blob 0)
1160            store.prune(150).await.unwrap();
1161
1162            // Verify pruned data is gone
1163            assert!(!store.has(5));
1164            assert!(store.get(5).await.unwrap().is_none());
1165
1166            // Verify remaining data and gaps
1167            assert!(store.has(105));
1168            assert!(store.has(305));
1169
1170            let (current_end, next_start) = store.next_gap(0);
1171            assert!(current_end.is_none());
1172            assert_eq!(next_start, Some(105));
1173
1174            let (current_end, next_start) = store.next_gap(105);
1175            assert_eq!(current_end, Some(105));
1176            assert_eq!(next_start, Some(305));
1177        });
1178    }
1179
1180    #[test_traced]
1181    fn test_prune_no_op() {
1182        // Initialize the deterministic context
1183        let executor = deterministic::Runner::default();
1184        executor.start(|context| async move {
1185            let cfg = Config {
1186                partition: "test_ordinal".into(),
1187                items_per_blob: NZU64!(100),
1188                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1189                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1190            };
1191
1192            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1193                .await
1194                .expect("Failed to initialize store");
1195
1196            // Insert data
1197            store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1198            store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1199            store.sync().await.unwrap();
1200
1201            // Try to prune before any data - should be no-op
1202            store.prune(50).await.unwrap();
1203
1204            // Verify no data was actually pruned
1205            assert!(store.has(100));
1206            assert!(store.has(200));
1207            let buffer = context.encode();
1208            assert!(buffer.contains("pruned_total 0"));
1209
1210            // Try to prune exactly at blob boundary - should be no-op
1211            store.prune(100).await.unwrap();
1212
1213            // Verify still no data pruned
1214            assert!(store.has(100));
1215            assert!(store.has(200));
1216            let buffer = context.encode();
1217            assert!(buffer.contains("pruned_total 0"));
1218        });
1219    }
1220
1221    #[test_traced]
1222    fn test_prune_empty_store() {
1223        // Initialize the deterministic context
1224        let executor = deterministic::Runner::default();
1225        executor.start(|context| async move {
1226            let cfg = Config {
1227                partition: "test_ordinal".into(),
1228                items_per_blob: NZU64!(100),
1229                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1230                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1231            };
1232
1233            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1234                .await
1235                .expect("Failed to initialize store");
1236
1237            // Try to prune empty store
1238            store.prune(1000).await.unwrap();
1239
1240            // Store should still be functional
1241            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1242            assert!(store.has(0));
1243        });
1244    }
1245
1246    #[test_traced]
1247    fn test_prune_after_restart() {
1248        // Initialize the deterministic context
1249        let executor = deterministic::Runner::default();
1250        executor.start(|context| async move {
1251            let cfg = Config {
1252                partition: "test_ordinal".into(),
1253                items_per_blob: NZU64!(100),
1254                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1255                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1256            };
1257
1258            // Create store and add data
1259            {
1260                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1261                    .await
1262                    .expect("Failed to initialize store");
1263
1264                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1265                store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1266                store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1267                store.close().await.unwrap();
1268            }
1269
1270            // Reopen and prune
1271            {
1272                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1273                    .await
1274                    .expect("Failed to initialize store");
1275
1276                // Verify data is there
1277                assert!(store.has(0));
1278                assert!(store.has(100));
1279                assert!(store.has(200));
1280
1281                // Prune up to index 150
1282                store.prune(150).await.unwrap();
1283
1284                // Verify pruning worked
1285                assert!(!store.has(0));
1286                assert!(store.has(100));
1287                assert!(store.has(200));
1288
1289                store.close().await.unwrap();
1290            }
1291
1292            // Reopen again and verify pruning persisted
1293            {
1294                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1295                    .await
1296                    .expect("Failed to initialize store");
1297
1298                assert!(!store.has(0));
1299                assert!(store.has(100));
1300                assert!(store.has(200));
1301
1302                // Check gaps
1303                let (current_end, next_start) = store.next_gap(0);
1304                assert!(current_end.is_none());
1305                assert_eq!(next_start, Some(100));
1306            }
1307        });
1308    }
1309
1310    #[test_traced]
1311    fn test_prune_multiple_operations() {
1312        // Initialize the deterministic context
1313        let executor = deterministic::Runner::default();
1314        executor.start(|context| async move {
1315            let cfg = Config {
1316                partition: "test_ordinal".into(),
1317                items_per_blob: NZU64!(50), // Smaller blobs for more granular testing
1318                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1319                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1320            };
1321
1322            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1323                .await
1324                .expect("Failed to initialize store");
1325
1326            // Insert data across many blobs
1327            let mut values = Vec::new();
1328            for i in 0..10 {
1329                let index = i * 50 + 25; // Middle of each blob
1330                let value = FixedBytes::new([i as u8; 32]);
1331                store.put(index, value.clone()).await.unwrap();
1332                values.push((index, value));
1333            }
1334            store.sync().await.unwrap();
1335
1336            // Prune incrementally
1337            for i in 1..5 {
1338                let prune_index = i * 50 + 10;
1339                store.prune(prune_index).await.unwrap();
1340
1341                // Verify appropriate data is pruned
1342                for (index, _) in &values {
1343                    if *index < prune_index {
1344                        assert!(!store.has(*index), "Index {index} should be pruned");
1345                    } else {
1346                        assert!(store.has(*index), "Index {index} should not be pruned");
1347                    }
1348                }
1349            }
1350
1351            // Check final state
1352            let buffer = context.encode();
1353            assert!(buffer.contains("pruned_total 4"));
1354
1355            // Verify remaining data
1356            for i in 4..10 {
1357                let index = i * 50 + 25;
1358                assert!(store.has(index));
1359                assert_eq!(
1360                    store.get(index).await.unwrap().unwrap(),
1361                    values[i as usize].1
1362                );
1363            }
1364        });
1365    }
1366
1367    #[test_traced]
1368    fn test_prune_blob_boundaries() {
1369        // Initialize the deterministic context
1370        let executor = deterministic::Runner::default();
1371        executor.start(|context| async move {
1372            let cfg = Config {
1373                partition: "test_ordinal".into(),
1374                items_per_blob: NZU64!(100),
1375                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1376                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1377            };
1378
1379            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1380                .await
1381                .expect("Failed to initialize store");
1382
1383            // Insert data at blob boundaries
1384            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); // Start of blob 0
1385            store.put(99, FixedBytes::new([99u8; 32])).await.unwrap(); // End of blob 0
1386            store.put(100, FixedBytes::new([100u8; 32])).await.unwrap(); // Start of blob 1
1387            store.put(199, FixedBytes::new([199u8; 32])).await.unwrap(); // End of blob 1
1388            store.put(200, FixedBytes::new([200u8; 32])).await.unwrap(); // Start of blob 2
1389            store.sync().await.unwrap();
1390
1391            // Test various pruning points around boundaries
1392
1393            // Prune exactly at blob boundary (100) - should prune blob 0
1394            store.prune(100).await.unwrap();
1395            assert!(!store.has(0));
1396            assert!(!store.has(99));
1397            assert!(store.has(100));
1398            assert!(store.has(199));
1399            assert!(store.has(200));
1400
1401            // Prune just before next boundary (199) - should not prune blob 1
1402            store.prune(199).await.unwrap();
1403            assert!(store.has(100));
1404            assert!(store.has(199));
1405            assert!(store.has(200));
1406
1407            // Prune exactly at next boundary (200) - should prune blob 1
1408            store.prune(200).await.unwrap();
1409            assert!(!store.has(100));
1410            assert!(!store.has(199));
1411            assert!(store.has(200));
1412
1413            let buffer = context.encode();
1414            assert!(buffer.contains("pruned_total 2"));
1415        });
1416    }
1417
1418    #[test_traced]
1419    fn test_prune_non_contiguous_sections() {
1420        // Initialize the deterministic context
1421        let executor = deterministic::Runner::default();
1422        executor.start(|context| async move {
1423            let cfg = Config {
1424                partition: "test_ordinal".into(),
1425                items_per_blob: NZU64!(100),
1426                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1427                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1428            };
1429
1430            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1431                .await
1432                .expect("Failed to initialize store");
1433
1434            // Insert data in non-contiguous sections (0, 2, 5, 7)
1435            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); // Section 0
1436            store.put(250, FixedBytes::new([50u8; 32])).await.unwrap(); // Section 2 (250/100 = 2)
1437            store.put(500, FixedBytes::new([44u8; 32])).await.unwrap(); // Section 5 (500/100 = 5)
1438            store.put(750, FixedBytes::new([45u8; 32])).await.unwrap(); // Section 7 (750/100 = 7)
1439            store.sync().await.unwrap();
1440
1441            // Verify all data exists initially
1442            assert!(store.has(0));
1443            assert!(store.has(250));
1444            assert!(store.has(500));
1445            assert!(store.has(750));
1446
1447            // Prune up to section 3 (index 300) - should remove sections 0 and 2
1448            store.prune(300).await.unwrap();
1449
1450            // Verify correct data was pruned
1451            assert!(!store.has(0)); // Section 0 pruned
1452            assert!(!store.has(250)); // Section 2 pruned
1453            assert!(store.has(500)); // Section 5 remains
1454            assert!(store.has(750)); // Section 7 remains
1455
1456            let buffer = context.encode();
1457            assert!(buffer.contains("pruned_total 2"));
1458
1459            // Prune up to section 6 (index 600) - should remove section 5
1460            store.prune(600).await.unwrap();
1461
1462            // Verify section 5 was pruned
1463            assert!(!store.has(500)); // Section 5 pruned
1464            assert!(store.has(750)); // Section 7 remains
1465
1466            let buffer = context.encode();
1467            assert!(buffer.contains("pruned_total 3"));
1468
1469            // Prune everything - should remove section 7
1470            store.prune(1000).await.unwrap();
1471
1472            // Verify all data is gone
1473            assert!(!store.has(750)); // Section 7 pruned
1474
1475            let buffer = context.encode();
1476            assert!(buffer.contains("pruned_total 4"));
1477        });
1478    }
1479
1480    #[test_traced]
1481    fn test_prune_removes_correct_pending() {
1482        // Initialize the deterministic context
1483        let executor = deterministic::Runner::default();
1484        executor.start(|context| async move {
1485            let cfg = Config {
1486                partition: "test_ordinal".into(),
1487                items_per_blob: NZU64!(100),
1488                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1489                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1490            };
1491            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1492                .await
1493                .expect("Failed to initialize store");
1494
1495            // Insert and sync some data in blob 0
1496            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1497            store.sync().await.unwrap();
1498
1499            // Add pending entries to blob 0 and blob 1
1500            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap(); // blob 0
1501            store.put(110, FixedBytes::new([110u8; 32])).await.unwrap(); // blob 1
1502
1503            // Verify all data is visible before pruning
1504            assert!(store.has(5));
1505            assert!(store.has(10));
1506            assert!(store.has(110));
1507
1508            // Prune up to index 100, which should remove blob 0 (indices 0-99).
1509            store.prune(150).await.unwrap();
1510
1511            // Verify that synced and pending entries in blob 0 are removed.
1512            assert!(!store.has(5));
1513            assert!(!store.has(10));
1514
1515            // Verify that the pending entry in blob 1 remains.
1516            assert!(store.has(110));
1517            assert_eq!(
1518                store.get(110).await.unwrap().unwrap(),
1519                FixedBytes::new([110u8; 32])
1520            );
1521
1522            // Sync the remaining pending entry and verify it's still there.
1523            store.sync().await.unwrap();
1524            assert!(store.has(110));
1525            assert_eq!(
1526                store.get(110).await.unwrap().unwrap(),
1527                FixedBytes::new([110u8; 32])
1528            );
1529        });
1530    }
1531
1532    #[test_traced]
1533    fn test_init_with_bits_none() {
1534        // Initialize the deterministic context
1535        let executor = deterministic::Runner::default();
1536        executor.start(|context| async move {
1537            let cfg = Config {
1538                partition: "test_ordinal".into(),
1539                items_per_blob: NZU64!(10), // Small blob size for testing
1540                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1541                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1542            };
1543
1544            // Create store with data across multiple sections
1545            {
1546                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1547                    .await
1548                    .expect("Failed to initialize store");
1549
1550                // Section 0 (indices 0-9)
1551                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1552                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1553                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1554
1555                // Section 1 (indices 10-19)
1556                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1557                store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
1558
1559                // Section 2 (indices 20-29)
1560                store.put(25, FixedBytes::new([25u8; 32])).await.unwrap();
1561
1562                store.close().await.unwrap();
1563            }
1564
1565            // Reinitialize with bits = None (should behave like regular init)
1566            {
1567                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1568                    context.clone(),
1569                    cfg.clone(),
1570                    None,
1571                )
1572                .await
1573                .expect("Failed to initialize store with bits");
1574
1575                // All records should be available
1576                assert!(store.has(0));
1577                assert!(store.has(5));
1578                assert!(store.has(9));
1579                assert!(store.has(10));
1580                assert!(store.has(15));
1581                assert!(store.has(25));
1582
1583                // Non-existent records should not be available
1584                assert!(!store.has(1));
1585                assert!(!store.has(11));
1586                assert!(!store.has(20));
1587
1588                // Verify values
1589                assert_eq!(
1590                    store.get(0).await.unwrap().unwrap(),
1591                    FixedBytes::new([0u8; 32])
1592                );
1593                assert_eq!(
1594                    store.get(15).await.unwrap().unwrap(),
1595                    FixedBytes::new([15u8; 32])
1596                );
1597            }
1598        });
1599    }
1600
1601    #[test_traced]
1602    fn test_init_with_bits_empty_hashmap() {
1603        // Initialize the deterministic context
1604        let executor = deterministic::Runner::default();
1605        executor.start(|context| async move {
1606            let cfg = Config {
1607                partition: "test_ordinal".into(),
1608                items_per_blob: NZU64!(10),
1609                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1610                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1611            };
1612
1613            // Create store with data
1614            {
1615                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1616                    .await
1617                    .expect("Failed to initialize store");
1618
1619                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1620                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1621                store.put(20, FixedBytes::new([20u8; 32])).await.unwrap();
1622
1623                store.close().await.unwrap();
1624            }
1625
1626            // Reinitialize with empty HashMap - should skip all sections
1627            {
1628                let bits: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1629                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1630                    context.clone(),
1631                    cfg.clone(),
1632                    Some(bits),
1633                )
1634                .await
1635                .expect("Failed to initialize store with bits");
1636
1637                // No records should be available since no sections were in the bits map
1638                assert!(!store.has(0));
1639                assert!(!store.has(10));
1640                assert!(!store.has(20));
1641            }
1642        });
1643    }
1644
1645    #[test_traced]
1646    fn test_init_with_bits_selective_sections() {
1647        // Initialize the deterministic context
1648        let executor = deterministic::Runner::default();
1649        executor.start(|context| async move {
1650            let cfg = Config {
1651                partition: "test_ordinal".into(),
1652                items_per_blob: NZU64!(10),
1653                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1654                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1655            };
1656
1657            // Create store with data in multiple sections
1658            {
1659                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1660                    .await
1661                    .expect("Failed to initialize store");
1662
1663                // Section 0 (indices 0-9)
1664                for i in 0..10 {
1665                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1666                }
1667
1668                // Section 1 (indices 10-19)
1669                for i in 10..20 {
1670                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1671                }
1672
1673                // Section 2 (indices 20-29)
1674                for i in 20..30 {
1675                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1676                }
1677
1678                store.close().await.unwrap();
1679            }
1680
1681            // Reinitialize with bits for only section 1
1682            {
1683                let mut bits_map: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1684
1685                // Create a BitVec that marks indices 12, 15, and 18 as present
1686                let mut bitvec = BitVec::zeroes(10);
1687                bitvec.set(2); // Index 12 (offset 2 in section 1)
1688                bitvec.set(5); // Index 15 (offset 5 in section 1)
1689                bitvec.set(8); // Index 18 (offset 8 in section 1)
1690                let bitvec_option = Some(bitvec);
1691
1692                bits_map.insert(1, &bitvec_option);
1693
1694                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1695                    context.clone(),
1696                    cfg.clone(),
1697                    Some(bits_map),
1698                )
1699                .await
1700                .expect("Failed to initialize store with bits");
1701
1702                // Only specified indices from section 1 should be available
1703                assert!(store.has(12));
1704                assert!(store.has(15));
1705                assert!(store.has(18));
1706
1707                // Other indices from section 1 should not be available
1708                assert!(!store.has(10));
1709                assert!(!store.has(11));
1710                assert!(!store.has(13));
1711                assert!(!store.has(14));
1712                assert!(!store.has(16));
1713                assert!(!store.has(17));
1714                assert!(!store.has(19));
1715
1716                // All indices from sections 0 and 2 should not be available
1717                for i in 0..10 {
1718                    assert!(!store.has(i));
1719                }
1720                for i in 20..30 {
1721                    assert!(!store.has(i));
1722                }
1723
1724                // Verify the available values
1725                assert_eq!(
1726                    store.get(12).await.unwrap().unwrap(),
1727                    FixedBytes::new([12u8; 32])
1728                );
1729                assert_eq!(
1730                    store.get(15).await.unwrap().unwrap(),
1731                    FixedBytes::new([15u8; 32])
1732                );
1733                assert_eq!(
1734                    store.get(18).await.unwrap().unwrap(),
1735                    FixedBytes::new([18u8; 32])
1736                );
1737            }
1738        });
1739    }
1740
1741    #[test_traced]
1742    fn test_init_with_bits_none_option_all_records_exist() {
1743        // Initialize the deterministic context
1744        let executor = deterministic::Runner::default();
1745        executor.start(|context| async move {
1746            let cfg = Config {
1747                partition: "test_ordinal".into(),
1748                items_per_blob: NZU64!(5),
1749                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1750                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1751            };
1752
1753            // Create store with all records in a section
1754            {
1755                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1756                    .await
1757                    .expect("Failed to initialize store");
1758
1759                // Fill section 1 completely (indices 5-9)
1760                for i in 5..10 {
1761                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1762                }
1763
1764                store.close().await.unwrap();
1765            }
1766
1767            // Reinitialize with None option for section 1 (expects all records)
1768            {
1769                let mut bits_map: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1770                let none_option: Option<BitVec> = None;
1771                bits_map.insert(1, &none_option);
1772
1773                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1774                    context.clone(),
1775                    cfg.clone(),
1776                    Some(bits_map),
1777                )
1778                .await
1779                .expect("Failed to initialize store with bits");
1780
1781                // All records in section 1 should be available
1782                for i in 5..10 {
1783                    assert!(store.has(i));
1784                    assert_eq!(
1785                        store.get(i).await.unwrap().unwrap(),
1786                        FixedBytes::new([i as u8; 32])
1787                    );
1788                }
1789            }
1790        });
1791    }
1792
1793    #[test_traced]
1794    #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(6)")]
1795    fn test_init_with_bits_none_option_missing_record_panics() {
1796        // Initialize the deterministic context
1797        let executor = deterministic::Runner::default();
1798        executor.start(|context| async move {
1799            let cfg = Config {
1800                partition: "test_ordinal".into(),
1801                items_per_blob: NZU64!(5),
1802                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1803                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1804            };
1805
1806            // Create store with missing record in a section
1807            {
1808                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1809                    .await
1810                    .expect("Failed to initialize store");
1811
1812                // Fill section 1 partially (skip index 6)
1813                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1814                // Skip index 6
1815                store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1816                store.put(8, FixedBytes::new([8u8; 32])).await.unwrap();
1817                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1818
1819                store.close().await.unwrap();
1820            }
1821
1822            // Reinitialize with None option for section 1 (expects all records)
1823            // This should panic because index 6 is missing
1824            {
1825                let mut bits_map: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1826                let none_option: Option<BitVec> = None;
1827                bits_map.insert(1, &none_option);
1828
1829                let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1830                    context.clone(),
1831                    cfg.clone(),
1832                    Some(bits_map),
1833                )
1834                .await
1835                .expect("Failed to initialize store with bits");
1836            }
1837        });
1838    }
1839
1840    #[test_traced]
1841    fn test_init_with_bits_mixed_sections() {
1842        // Initialize the deterministic context
1843        let executor = deterministic::Runner::default();
1844        executor.start(|context| async move {
1845            let cfg = Config {
1846                partition: "test_ordinal".into(),
1847                items_per_blob: NZU64!(5),
1848                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1849                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1850            };
1851
1852            // Create store with data in multiple sections
1853            {
1854                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1855                    .await
1856                    .expect("Failed to initialize store");
1857
1858                // Section 0: indices 0-4 (fill completely)
1859                for i in 0..5 {
1860                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1861                }
1862
1863                // Section 1: indices 5-9 (fill partially)
1864                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1865                store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1866                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1867
1868                // Section 2: indices 10-14 (fill completely)
1869                for i in 10..15 {
1870                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1871                }
1872
1873                store.close().await.unwrap();
1874            }
1875
1876            // Reinitialize with mixed bits configuration
1877            {
1878                let mut bits_map: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1879
1880                // Section 0: None option (expects all records)
1881                let none_option: Option<BitVec> = None;
1882                bits_map.insert(0, &none_option);
1883
1884                // Section 1: BitVec with specific indices
1885                let mut bitvec1 = BitVec::zeroes(5);
1886                bitvec1.set(0); // Index 5
1887                bitvec1.set(2); // Index 7
1888                                // Note: not setting bit for index 9, so it should be ignored
1889                let bitvec1_option = Some(bitvec1);
1890                bits_map.insert(1, &bitvec1_option);
1891
1892                // Section 2: Not in map, should be skipped entirely
1893
1894                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1895                    context.clone(),
1896                    cfg.clone(),
1897                    Some(bits_map),
1898                )
1899                .await
1900                .expect("Failed to initialize store with bits");
1901
1902                // All records from section 0 should be available
1903                for i in 0..5 {
1904                    assert!(store.has(i));
1905                    assert_eq!(
1906                        store.get(i).await.unwrap().unwrap(),
1907                        FixedBytes::new([i as u8; 32])
1908                    );
1909                }
1910
1911                // Only specified records from section 1 should be available
1912                assert!(store.has(5));
1913                assert!(store.has(7));
1914                assert!(!store.has(6));
1915                assert!(!store.has(8));
1916                assert!(!store.has(9)); // Not set in bitvec
1917
1918                // No records from section 2 should be available
1919                for i in 10..15 {
1920                    assert!(!store.has(i));
1921                }
1922            }
1923        });
1924    }
1925
1926    #[test_traced]
1927    #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(2)")]
1928    fn test_init_with_bits_corrupted_records() {
1929        // Initialize the deterministic context
1930        let executor = deterministic::Runner::default();
1931        executor.start(|context| async move {
1932            let cfg = Config {
1933                partition: "test_ordinal".into(),
1934                items_per_blob: NZU64!(5),
1935                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1936                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1937            };
1938
1939            // Create store with data and corrupt one record
1940            {
1941                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1942                    .await
1943                    .expect("Failed to initialize store");
1944
1945                // Section 0: indices 0-4
1946                for i in 0..5 {
1947                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1948                }
1949
1950                store.close().await.unwrap();
1951            }
1952
1953            // Corrupt record at index 2
1954            {
1955                let (blob, _) = context
1956                    .open("test_ordinal", &0u64.to_be_bytes())
1957                    .await
1958                    .unwrap();
1959                // Corrupt the CRC of record at index 2
1960                let offset = 2 * 36 + 32; // 2 * record_size + value_size
1961                blob.write_at(vec![0xFF], offset).await.unwrap();
1962                blob.sync().await.unwrap();
1963            }
1964
1965            // Reinitialize with bits that include the corrupted record
1966            {
1967                let mut bits_map: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1968
1969                // Create a BitVec that includes the corrupted record
1970                let mut bitvec = BitVec::zeroes(5);
1971                bitvec.set(0); // Index 0
1972                bitvec.set(2); // Index 2 (corrupted) - this will cause a panic
1973                bitvec.set(4); // Index 4
1974                let bitvec_option = Some(bitvec);
1975                bits_map.insert(0, &bitvec_option);
1976
1977                let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1978                    context.clone(),
1979                    cfg.clone(),
1980                    Some(bits_map),
1981                )
1982                .await
1983                .expect("Failed to initialize store with bits");
1984            }
1985        });
1986    }
1987
1988    /// A dummy value that will fail parsing if the value is 0.
1989    #[derive(Debug, PartialEq, Eq)]
1990    pub struct DummyValue {
1991        pub value: u64,
1992    }
1993
1994    impl Write for DummyValue {
1995        fn write(&self, buf: &mut impl BufMut) {
1996            self.value.write(buf);
1997        }
1998    }
1999
2000    impl Read for DummyValue {
2001        type Cfg = ();
2002
2003        fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
2004            let value = u64::read(buf)?;
2005            if value == 0 {
2006                return Err(commonware_codec::Error::Invalid(
2007                    "DummyValue",
2008                    "value must be non-zero",
2009                ));
2010            }
2011            Ok(Self { value })
2012        }
2013    }
2014
2015    impl FixedSize for DummyValue {
2016        const SIZE: usize = u64::SIZE;
2017    }
2018
2019    #[test_traced]
2020    fn test_init_skip_unparseable_record() {
2021        // Initialize the deterministic context
2022        let executor = deterministic::Runner::default();
2023        executor.start(|context| async move {
2024            let cfg = Config {
2025                partition: "test_ordinal".into(),
2026                items_per_blob: NZU64!(1),
2027                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
2028                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
2029            };
2030
2031            // Create store with valid records
2032            {
2033                let mut store = Ordinal::<_, DummyValue>::init(context.clone(), cfg.clone())
2034                    .await
2035                    .expect("Failed to initialize store");
2036
2037                // Add records at indices 1, 2, 4
2038                store.put(1, DummyValue { value: 1 }).await.unwrap();
2039                store.put(2, DummyValue { value: 0 }).await.unwrap(); // will fail parsing
2040                store.put(4, DummyValue { value: 4 }).await.unwrap();
2041
2042                store.close().await.unwrap();
2043            }
2044
2045            // Reinitialize - should skip the unparseable record but continue processing
2046            {
2047                let store = Ordinal::<_, DummyValue>::init(context.clone(), cfg.clone())
2048                    .await
2049                    .expect("Failed to initialize store");
2050
2051                // Record 0 should be available
2052                assert!(store.has(1), "Record 1 should be available");
2053                assert_eq!(
2054                    store.get(1).await.unwrap().unwrap(),
2055                    DummyValue { value: 1 },
2056                    "Record 0 should have correct value"
2057                );
2058
2059                // Record 2 should NOT be available (unparseable)
2060                assert!(
2061                    !store.has(2),
2062                    "Record 2 should not be available (unparseable)"
2063                );
2064
2065                // This tests that we didn't exit early when encountering the unparseable record
2066                assert!(
2067                    store.has(4),
2068                    "Record 4 should be available - we should not exit early on unparseable record"
2069                );
2070                assert_eq!(
2071                    store.get(4).await.unwrap().unwrap(),
2072                    DummyValue { value: 4 },
2073                    "Record 4 should have correct value"
2074                );
2075            }
2076        });
2077    }
2078}