Skip to main content

commonware_storage/ordinal/
mod.rs

1//! A persistent index that maps sparse indices to [commonware_utils::Array]s.
2//!
3//! [Ordinal] is a collection of [commonware_runtime::Blob]s containing ordered records of fixed size.
4//! Because records are fixed size, file position corresponds directly to index. Unlike
5//! [crate::journal::contiguous::fixed::Journal], [Ordinal] supports out-of-order insertion.
6//!
7//! # Design
8//!
9//! [Ordinal] is a collection of [commonware_runtime::Blob]s where:
10//! - Each record: `[V][crc32(V)]` where V is an [commonware_utils::Array]
11//! - Index N is at file offset: `N * RECORD_SIZE`
12//! - A [crate::rmap::RMap] tracks which indices have been written (and which are missing)
13//!
14//! # File Organization
15//!
16//! Records are grouped into blobs to avoid having too many files:
17//!
18//! ```text
19//! Blob 0: indices 0-999
20//! Blob 1: indices 1000-1999
21//! ...
22//! ```
23//!
24//! # Format
25//!
26//! [Ordinal] stores values in the following format:
27//!
28//! ```text
29//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
30//! | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |
31//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
32//! |          Value (Fixed Size)       |     CRC32     |
33//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
34//! ```
35//!
36//! # Performance Characteristics
37//!
38//! - **Writes**: O(1) - direct offset calculation
39//! - **Reads**: O(1) - direct offset calculation
40//! - **Has**: O(1) - in-memory lookup (via [crate::rmap::RMap])
41//! - **Next Gap**: O(log n) - in-memory range query (via [crate::rmap::RMap])
42//! - **Restart**: O(n) where n is the number of existing records (to rebuild [crate::rmap::RMap])
43//!
44//! # Atomicity
45//!
46//! [Ordinal] eagerly writes all new data to [commonware_runtime::Blob]s. New data, however, is not
47//! synced until [Ordinal::sync] is called. As a result, data is not guaranteed to be atomically
48//! persisted (i.e. shutdown before [Ordinal::sync] may lead to some writes being lost).
49//!
50//! _If you want atomicity for sparse writes, pair [commonware_utils::bitmap::BitMap] and
51//! [crate::metadata::Metadata] with [Ordinal] (use bits to indicate which items have been atomically
52//! written)._
53//!
54//! # Recovery
55//!
56//! On restart, [Ordinal] validates all records using their CRC32 and rebuilds the in-memory
57//! [crate::rmap::RMap]. Invalid records (corrupted or empty) are excluded from the rebuilt index.
58//!
59//! # Example
60//!
61//! ```rust
62//! use commonware_runtime::{Spawner, Runner, deterministic};
63//! use commonware_storage::ordinal::{Ordinal, Config};
64//! use commonware_utils::{sequence::FixedBytes, NZUsize, NZU64};
65//!
66//! let executor = deterministic::Runner::default();
67//! executor.start(|context| async move {
68//!     // Create a store for 32-byte values
69//!     let cfg = Config {
70//!         partition: "ordinal_store".into(),
71//!         items_per_blob: NZU64!(10000),
72//!         write_buffer: NZUsize!(4096),
73//!         replay_buffer: NZUsize!(1024 * 1024),
74//!     };
75//!     let mut store = Ordinal::<_, FixedBytes<32>>::init(context, cfg).await.unwrap();
76//!
77//!     // Put values at specific indices
78//!     let value1 = FixedBytes::new([1u8; 32]);
79//!     let value2 = FixedBytes::new([2u8; 32]);
80//!     store.put(0, value1).await.unwrap();
81//!     store.put(5, value2).await.unwrap();
82//!
83//!     // Sync to disk
84//!     store.sync().await.unwrap();
85//!
86//!     // Check for gaps
87//!     let (current_end, next_start) = store.next_gap(0);
88//!     assert_eq!(current_end, Some(0));
89//!     assert_eq!(next_start, Some(5));
90//!
91//!     // Sync the store
92//!     store.sync().await.unwrap();
93//! });
94//! ```
95
96mod storage;
97
98use std::num::{NonZeroU64, NonZeroUsize};
99pub use storage::Ordinal;
100use thiserror::Error;
101
102/// Errors that can occur when interacting with the [Ordinal].
103#[derive(Debug, Error)]
104pub enum Error {
105    #[error("runtime error: {0}")]
106    Runtime(#[from] commonware_runtime::Error),
107    #[error("codec error: {0}")]
108    Codec(#[from] commonware_codec::Error),
109    #[error("invalid blob name: {0}")]
110    InvalidBlobName(String),
111    #[error("invalid record: {0}")]
112    InvalidRecord(u64),
113    #[error("missing record at {0}")]
114    MissingRecord(u64),
115}
116
117/// Configuration for [Ordinal] storage.
118#[derive(Clone)]
119pub struct Config {
120    /// The [commonware_runtime::Storage] partition to use for storing the index.
121    pub partition: String,
122
123    /// The maximum number of items to store in each index blob.
124    pub items_per_blob: NonZeroU64,
125
126    /// The size of the write buffer to use when writing to the index.
127    pub write_buffer: NonZeroUsize,
128
129    /// The size of the read buffer to use on restart.
130    pub replay_buffer: NonZeroUsize,
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use commonware_codec::{FixedSize, Read, ReadExt, Write};
137    use commonware_cryptography::Crc32;
138    use commonware_macros::{test_group, test_traced};
139    use commonware_runtime::{deterministic, Blob, Buf, BufMut, Metrics, Runner, Storage};
140    use commonware_utils::{bitmap::BitMap, hex, sequence::FixedBytes, NZUsize, NZU64};
141    use rand::RngCore;
142    use std::collections::BTreeMap;
143
144    const DEFAULT_ITEMS_PER_BLOB: u64 = 1000;
145    const DEFAULT_WRITE_BUFFER: usize = 4096;
146    const DEFAULT_REPLAY_BUFFER: usize = 1024 * 1024;
147
148    #[test_traced]
149    fn test_put_get() {
150        // Initialize the deterministic context
151        let executor = deterministic::Runner::default();
152        executor.start(|context| async move {
153            // Initialize the store
154            let cfg = Config {
155                partition: "test_ordinal".into(),
156                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
157                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
158                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
159            };
160            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
161                .await
162                .expect("Failed to initialize store");
163
164            let value = FixedBytes::new([42u8; 32]);
165
166            // Check index doesn't exist
167            assert!(!store.has(0));
168
169            // Put the value at index 0
170            store
171                .put(0, value.clone())
172                .await
173                .expect("Failed to put data");
174
175            // Check index exists
176            assert!(store.has(0));
177
178            // Get the value back (before sync)
179            let retrieved = store
180                .get(0)
181                .await
182                .expect("Failed to get data")
183                .expect("Data not found");
184            assert_eq!(retrieved, value);
185
186            // Force a sync
187            store.sync().await.expect("Failed to sync data");
188
189            // Check metrics
190            let buffer = context.encode();
191            assert!(buffer.contains("gets_total 1"), "{}", buffer);
192            assert!(buffer.contains("puts_total 1"), "{}", buffer);
193            assert!(buffer.contains("has_total 2"), "{}", buffer);
194            assert!(buffer.contains("syncs_total 1"), "{}", buffer);
195            assert!(buffer.contains("pruned_total 0"), "{}", buffer);
196
197            // Get the value back (after sync)
198            let retrieved = store
199                .get(0)
200                .await
201                .expect("Failed to get data")
202                .expect("Data not found");
203            assert_eq!(retrieved, value);
204        });
205    }
206
207    #[test_traced]
208    fn test_multiple_indices() {
209        // Initialize the deterministic context
210        let executor = deterministic::Runner::default();
211        executor.start(|context| async move {
212            // Initialize the store
213            let cfg = Config {
214                partition: "test_ordinal".into(),
215                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
216                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
217                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
218            };
219            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
220                .await
221                .expect("Failed to initialize store");
222
223            // Insert multiple values at different indices
224            let indices = vec![
225                (0u64, FixedBytes::new([0u8; 32])),
226                (5u64, FixedBytes::new([5u8; 32])),
227                (10u64, FixedBytes::new([10u8; 32])),
228                (100u64, FixedBytes::new([100u8; 32])),
229                (1000u64, FixedBytes::new([200u8; 32])), // Different blob
230            ];
231
232            for (index, value) in &indices {
233                store
234                    .put(*index, value.clone())
235                    .await
236                    .expect("Failed to put data");
237            }
238
239            // Sync to disk
240            store.sync().await.expect("Failed to sync");
241
242            // Retrieve all values and verify
243            for (index, value) in &indices {
244                let retrieved = store
245                    .get(*index)
246                    .await
247                    .expect("Failed to get data")
248                    .expect("Data not found");
249                assert_eq!(&retrieved, value);
250            }
251        });
252    }
253
254    #[test_traced]
255    fn test_sparse_indices() {
256        // Initialize the deterministic context
257        let executor = deterministic::Runner::default();
258        executor.start(|context| async move {
259            // Initialize the store
260            let cfg = Config {
261                partition: "test_ordinal".into(),
262                items_per_blob: NZU64!(100), // Smaller blobs for testing
263                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
264                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
265            };
266            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
267                .await
268                .expect("Failed to initialize store");
269
270            // Insert sparse values
271            let indices = vec![
272                (0u64, FixedBytes::new([0u8; 32])),
273                (99u64, FixedBytes::new([99u8; 32])), // End of first blob
274                (100u64, FixedBytes::new([100u8; 32])), // Start of second blob
275                (500u64, FixedBytes::new([200u8; 32])), // Start of sixth blob
276            ];
277
278            for (index, value) in &indices {
279                store
280                    .put(*index, value.clone())
281                    .await
282                    .expect("Failed to put data");
283            }
284
285            // Check that intermediate indices don't exist
286            assert!(!store.has(1));
287            assert!(!store.has(50));
288            assert!(!store.has(101));
289            assert!(!store.has(499));
290
291            // Sync and verify
292            store.sync().await.expect("Failed to sync");
293
294            for (index, value) in &indices {
295                let retrieved = store
296                    .get(*index)
297                    .await
298                    .expect("Failed to get data")
299                    .expect("Data not found");
300                assert_eq!(&retrieved, value);
301            }
302        });
303    }
304
305    #[test_traced]
306    fn test_next_gap() {
307        // Initialize the deterministic context
308        let executor = deterministic::Runner::default();
309        executor.start(|context| async move {
310            // Initialize the store
311            let cfg = Config {
312                partition: "test_ordinal".into(),
313                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
314                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
315                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
316            };
317            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
318                .await
319                .expect("Failed to initialize store");
320
321            // Insert values with gaps
322            store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
323            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
324            store.put(11, FixedBytes::new([11u8; 32])).await.unwrap();
325            store.put(14, FixedBytes::new([14u8; 32])).await.unwrap();
326
327            // Check gaps
328            let (current_end, start_next) = store.next_gap(0);
329            assert!(current_end.is_none());
330            assert_eq!(start_next, Some(1));
331
332            let (current_end, start_next) = store.next_gap(1);
333            assert_eq!(current_end, Some(1));
334            assert_eq!(start_next, Some(10));
335
336            let (current_end, start_next) = store.next_gap(10);
337            assert_eq!(current_end, Some(11));
338            assert_eq!(start_next, Some(14));
339
340            let (current_end, start_next) = store.next_gap(11);
341            assert_eq!(current_end, Some(11));
342            assert_eq!(start_next, Some(14));
343
344            let (current_end, start_next) = store.next_gap(12);
345            assert!(current_end.is_none());
346            assert_eq!(start_next, Some(14));
347
348            let (current_end, start_next) = store.next_gap(14);
349            assert_eq!(current_end, Some(14));
350            assert!(start_next.is_none());
351        });
352    }
353
354    #[test_traced]
355    fn test_missing_items() {
356        // Initialize the deterministic context
357        let executor = deterministic::Runner::default();
358        executor.start(|context| async move {
359            // Initialize the store
360            let cfg = Config {
361                partition: "test_ordinal".into(),
362                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
363                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
364                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
365            };
366            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
367                .await
368                .expect("Failed to initialize store");
369
370            // Test 1: Empty store - should return no items
371            assert_eq!(store.missing_items(0, 5), Vec::<u64>::new());
372            assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
373
374            // Test 2: Insert values with gaps
375            store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
376            store.put(2, FixedBytes::new([2u8; 32])).await.unwrap();
377            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
378            store.put(6, FixedBytes::new([6u8; 32])).await.unwrap();
379            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
380
381            // Test 3: Find missing items from the beginning
382            assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
383            assert_eq!(store.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
384            assert_eq!(store.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
385
386            // Test 4: Find missing items from within a gap
387            assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
388            assert_eq!(store.missing_items(4, 2), vec![4, 7]);
389
390            // Test 5: Find missing items from within a range
391            assert_eq!(store.missing_items(1, 3), vec![3, 4, 7]);
392            assert_eq!(store.missing_items(2, 4), vec![3, 4, 7, 8]);
393            assert_eq!(store.missing_items(5, 2), vec![7, 8]);
394
395            // Test 6: Find missing items after the last range (no more gaps)
396            assert_eq!(store.missing_items(11, 5), Vec::<u64>::new());
397            assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
398
399            // Test 7: Large gap scenario
400            store.put(1000, FixedBytes::new([100u8; 32])).await.unwrap();
401
402            // Gap between 10 and 1000
403            let items = store.missing_items(11, 10);
404            assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
405
406            // Request more items than available in gap
407            let items = store.missing_items(990, 15);
408            assert_eq!(
409                items,
410                vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
411            );
412
413            // Test 8: After syncing (data should remain consistent)
414            store.sync().await.unwrap();
415            assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
416            assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
417
418            // Test 9: Cross-blob boundary scenario
419            store.put(9999, FixedBytes::new([99u8; 32])).await.unwrap();
420            store
421                .put(10001, FixedBytes::new([101u8; 32]))
422                .await
423                .unwrap();
424
425            // Find missing items across blob boundary (10000 is the boundary)
426            let items = store.missing_items(9998, 5);
427            assert_eq!(items, vec![9998, 10000]);
428        });
429    }
430
431    #[test_traced]
432    fn test_restart() {
433        // Initialize the deterministic context
434        let executor = deterministic::Runner::default();
435        executor.start(|context| async move {
436            let cfg = Config {
437                partition: "test_ordinal".into(),
438                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
439                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
440                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
441            };
442
443            // Insert data and close
444            {
445                let mut store =
446                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
447                        .await
448                        .expect("Failed to initialize store");
449
450                let values = vec![
451                    (0u64, FixedBytes::new([0u8; 32])),
452                    (100u64, FixedBytes::new([100u8; 32])),
453                    (1000u64, FixedBytes::new([200u8; 32])),
454                ];
455
456                for (index, value) in &values {
457                    store
458                        .put(*index, value.clone())
459                        .await
460                        .expect("Failed to put data");
461                }
462
463                store.sync().await.expect("Failed to sync store");
464            }
465
466            // Reopen and verify data persisted
467            {
468                let store =
469                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
470                        .await
471                        .expect("Failed to initialize store");
472
473                let values = vec![
474                    (0u64, FixedBytes::new([0u8; 32])),
475                    (100u64, FixedBytes::new([100u8; 32])),
476                    (1000u64, FixedBytes::new([200u8; 32])),
477                ];
478
479                for (index, value) in &values {
480                    let retrieved = store
481                        .get(*index)
482                        .await
483                        .expect("Failed to get data")
484                        .expect("Data not found");
485                    assert_eq!(&retrieved, value);
486                }
487
488                // Check gaps are preserved
489                let (current_end, start_next) = store.next_gap(0);
490                assert_eq!(current_end, Some(0));
491                assert_eq!(start_next, Some(100));
492            }
493        });
494    }
495
496    #[test_traced]
497    fn test_invalid_record() {
498        // Initialize the deterministic context
499        let executor = deterministic::Runner::default();
500        executor.start(|context| async move {
501            let cfg = Config {
502                partition: "test_ordinal".into(),
503                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
504                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
505                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
506            };
507
508            // Create store with data
509            {
510                let mut store =
511                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
512                        .await
513                        .expect("Failed to initialize store");
514
515                store
516                    .put(0, FixedBytes::new([42u8; 32]))
517                    .await
518                    .expect("Failed to put data");
519                store.sync().await.expect("Failed to sync store");
520            }
521
522            // Corrupt the data
523            {
524                let (blob, _) = context
525                    .open("test_ordinal", &0u64.to_be_bytes())
526                    .await
527                    .unwrap();
528                // Corrupt the CRC by changing a byte
529                blob.write_at(32, vec![0xFF]).await.unwrap();
530                blob.sync().await.unwrap();
531            }
532
533            // Reopen and try to read corrupted data
534            {
535                let store =
536                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
537                        .await
538                        .expect("Failed to initialize store");
539
540                // Reading corrupt record will return empty
541                let result = store.get(0).await.unwrap();
542                assert!(result.is_none());
543
544                // The index should not be in the intervals after restart with corrupted data
545                assert!(!store.has(0));
546            }
547        });
548    }
549
550    #[test_traced]
551    fn test_get_nonexistent() {
552        // Initialize the deterministic context
553        let executor = deterministic::Runner::default();
554        executor.start(|context| async move {
555            // Initialize the store
556            let cfg = Config {
557                partition: "test_ordinal".into(),
558                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
559                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
560                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
561            };
562            let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
563                .await
564                .expect("Failed to initialize store");
565
566            // Attempt to get an index that doesn't exist
567            let retrieved = store.get(999).await.expect("Failed to get data");
568            assert!(retrieved.is_none());
569
570            // Check has returns false
571            assert!(!store.has(999));
572        });
573    }
574
575    #[test_traced]
576    fn test_destroy() {
577        // Initialize the deterministic context
578        let executor = deterministic::Runner::default();
579        executor.start(|context| async move {
580            let cfg = Config {
581                partition: "test_ordinal".into(),
582                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
583                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
584                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
585            };
586
587            // Create store with data
588            {
589                let mut store =
590                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
591                        .await
592                        .expect("Failed to initialize store");
593
594                store
595                    .put(0, FixedBytes::new([0u8; 32]))
596                    .await
597                    .expect("Failed to put data");
598                store
599                    .put(1000, FixedBytes::new([100u8; 32]))
600                    .await
601                    .expect("Failed to put data");
602
603                // Destroy the store
604                store.destroy().await.expect("Failed to destroy store");
605            }
606
607            // Try to create a new store - it should be empty
608            {
609                let store =
610                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
611                        .await
612                        .expect("Failed to initialize store");
613
614                // Should not find any data
615                assert!(store.get(0).await.unwrap().is_none());
616                assert!(store.get(1000).await.unwrap().is_none());
617                assert!(!store.has(0));
618                assert!(!store.has(1000));
619            }
620        });
621    }
622
623    #[test_traced]
624    fn test_partial_record_write() {
625        // Initialize the deterministic context
626        let executor = deterministic::Runner::default();
627        executor.start(|context| async move {
628            let cfg = Config {
629                partition: "test_ordinal".into(),
630                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
631                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
632                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
633            };
634
635            // Create store with data
636            {
637                let mut store =
638                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
639                        .await
640                        .expect("Failed to initialize store");
641
642                store
643                    .put(0, FixedBytes::new([42u8; 32]))
644                    .await
645                    .expect("Failed to put data");
646                store
647                    .put(1, FixedBytes::new([43u8; 32]))
648                    .await
649                    .expect("Failed to put data");
650                store.sync().await.expect("Failed to sync store");
651            }
652
653            // Corrupt by writing partial record (only value, no CRC)
654            {
655                let (blob, _) = context
656                    .open("test_ordinal", &0u64.to_be_bytes())
657                    .await
658                    .unwrap();
659                // Overwrite second record with partial data (32 bytes instead of 36)
660                blob.write_at(36, vec![0xFF; 32]).await.unwrap();
661                blob.sync().await.unwrap();
662            }
663
664            // Reopen and verify it handles partial write gracefully
665            {
666                let store =
667                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
668                        .await
669                        .expect("Failed to initialize store");
670
671                // First record should be fine
672                assert_eq!(
673                    store.get(0).await.unwrap().unwrap(),
674                    FixedBytes::new([42u8; 32])
675                );
676
677                // Second record should be removed due to partial write
678                assert!(!store.has(1));
679                assert!(store.get(1).await.unwrap().is_none());
680
681                // Store should still be functional
682                let mut store_mut = store;
683                store_mut.put(1, FixedBytes::new([44u8; 32])).await.unwrap();
684                assert_eq!(
685                    store_mut.get(1).await.unwrap().unwrap(),
686                    FixedBytes::new([44u8; 32])
687                );
688            }
689        });
690    }
691
692    #[test_traced]
693    fn test_corrupted_value() {
694        // Initialize the deterministic context
695        let executor = deterministic::Runner::default();
696        executor.start(|context| async move {
697            let cfg = Config {
698                partition: "test_ordinal".into(),
699                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
700                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
701                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
702            };
703
704            // Create store with data
705            {
706                let mut store =
707                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
708                        .await
709                        .expect("Failed to initialize store");
710
711                store
712                    .put(0, FixedBytes::new([42u8; 32]))
713                    .await
714                    .expect("Failed to put data");
715                store
716                    .put(1, FixedBytes::new([43u8; 32]))
717                    .await
718                    .expect("Failed to put data");
719                store.sync().await.expect("Failed to sync store");
720            }
721
722            // Corrupt the value portion of a record
723            {
724                let (blob, _) = context
725                    .open("test_ordinal", &0u64.to_be_bytes())
726                    .await
727                    .unwrap();
728                // Corrupt some bytes in the value of the first record
729                blob.write_at(10, hex!("0xFFFFFFFF").to_vec())
730                    .await
731                    .unwrap();
732                blob.sync().await.unwrap();
733            }
734
735            // Reopen and verify it detects corruption
736            {
737                let store =
738                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
739                        .await
740                        .expect("Failed to initialize store");
741
742                // First record should be detected as corrupted (CRC mismatch)
743                assert!(!store.has(0));
744
745                // Second record should still be valid
746                assert!(store.has(1));
747                assert_eq!(
748                    store.get(1).await.unwrap().unwrap(),
749                    FixedBytes::new([43u8; 32])
750                );
751            }
752        });
753    }
754
755    #[test_traced]
756    fn test_crc_corruptions() {
757        // Initialize the deterministic context
758        let executor = deterministic::Runner::default();
759        executor.start(|context| async move {
760            let cfg = Config {
761                partition: "test_ordinal".into(),
762                items_per_blob: NZU64!(10), // Small blob size for testing
763                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
764                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
765            };
766
767            // Create store with data across multiple blobs
768            {
769                let mut store =
770                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
771                        .await
772                        .expect("Failed to initialize store");
773
774                // Add values across 2 blobs
775                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
776                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
777                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
778                store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
779                store.sync().await.expect("Failed to sync store");
780            }
781
782            // Corrupt CRCs in different blobs
783            {
784                // Corrupt CRC in first blob
785                let (blob, _) = context
786                    .open("test_ordinal", &0u64.to_be_bytes())
787                    .await
788                    .unwrap();
789                blob.write_at(32, vec![0xFF]).await.unwrap(); // Corrupt CRC of index 0
790                blob.sync().await.unwrap();
791
792                // Corrupt value in second blob (which will invalidate CRC)
793                let (blob, _) = context
794                    .open("test_ordinal", &1u64.to_be_bytes())
795                    .await
796                    .unwrap();
797                blob.write_at(5, vec![0xFF; 4]).await.unwrap(); // Corrupt value of index 10
798                blob.sync().await.unwrap();
799            }
800
801            // Reopen and verify handling of CRC corruptions
802            {
803                let store =
804                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
805                        .await
806                        .expect("Failed to initialize store");
807
808                // Corrupted records should not be present
809                assert!(!store.has(0)); // CRC corrupted
810                assert!(!store.has(10)); // Value corrupted (CRC mismatch)
811
812                // Valid records should still be accessible
813                assert!(store.has(5));
814                assert!(store.has(15));
815                assert_eq!(
816                    store.get(5).await.unwrap().unwrap(),
817                    FixedBytes::new([5u8; 32])
818                );
819                assert_eq!(
820                    store.get(15).await.unwrap().unwrap(),
821                    FixedBytes::new([15u8; 32])
822                );
823            }
824        });
825    }
826
827    #[test_traced]
828    fn test_extra_bytes_in_blob() {
829        // Initialize the deterministic context
830        let executor = deterministic::Runner::default();
831        executor.start(|context| async move {
832            let cfg = Config {
833                partition: "test_ordinal".into(),
834                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
835                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
836                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
837            };
838
839            // Create store with data
840            {
841                let mut store =
842                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
843                        .await
844                        .expect("Failed to initialize store");
845
846                store
847                    .put(0, FixedBytes::new([42u8; 32]))
848                    .await
849                    .expect("Failed to put data");
850                store
851                    .put(1, FixedBytes::new([43u8; 32]))
852                    .await
853                    .expect("Failed to put data");
854                store.sync().await.expect("Failed to sync store");
855            }
856
857            // Add extra bytes at the end of blob
858            {
859                let (blob, size) = context
860                    .open("test_ordinal", &0u64.to_be_bytes())
861                    .await
862                    .unwrap();
863                // Add garbage data that forms a complete but invalid record
864                // This avoids partial record issues
865                let mut garbage = vec![0xFF; 32]; // Invalid value
866                let invalid_crc = 0xDEADBEEFu32;
867                garbage.extend_from_slice(&invalid_crc.to_be_bytes());
868                assert_eq!(garbage.len(), 36); // Full record size
869                blob.write_at(size, garbage).await.unwrap();
870                blob.sync().await.unwrap();
871            }
872
873            // Reopen and verify it handles extra bytes
874            {
875                let store =
876                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
877                        .await
878                        .expect("Failed to initialize store");
879
880                // Original records should still be valid
881                assert!(store.has(0));
882                assert!(store.has(1));
883                assert_eq!(
884                    store.get(0).await.unwrap().unwrap(),
885                    FixedBytes::new([42u8; 32])
886                );
887                assert_eq!(
888                    store.get(1).await.unwrap().unwrap(),
889                    FixedBytes::new([43u8; 32])
890                );
891
892                // Store should still be functional
893                let mut store_mut = store;
894                store_mut.put(2, FixedBytes::new([44u8; 32])).await.unwrap();
895                assert_eq!(
896                    store_mut.get(2).await.unwrap().unwrap(),
897                    FixedBytes::new([44u8; 32])
898                );
899            }
900        });
901    }
902
903    #[test_traced]
904    fn test_zero_filled_records() {
905        // Initialize the deterministic context
906        let executor = deterministic::Runner::default();
907        executor.start(|context| async move {
908            let cfg = Config {
909                partition: "test_ordinal".into(),
910                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
911                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
912                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
913            };
914
915            // Create blob with zero-filled space
916            {
917                let (blob, _) = context
918                    .open("test_ordinal", &0u64.to_be_bytes())
919                    .await
920                    .unwrap();
921
922                // Write zeros for several record positions
923                let zeros = vec![0u8; 36 * 5]; // 5 records worth of zeros
924                blob.write_at(0, zeros).await.unwrap();
925
926                // Write a valid record after the zeros
927                let mut valid_record = vec![44u8; 32];
928                let crc = Crc32::checksum(&valid_record);
929                valid_record.extend_from_slice(&crc.to_be_bytes());
930                blob.write_at(36 * 5, valid_record).await.unwrap();
931
932                blob.sync().await.unwrap();
933            }
934
935            // Initialize store and verify it handles zero-filled records
936            {
937                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
938                    .await
939                    .expect("Failed to initialize store");
940
941                // Zero-filled positions should not be considered valid
942                for i in 0..5 {
943                    assert!(!store.has(i));
944                }
945
946                // The valid record should be found
947                assert!(store.has(5));
948                assert_eq!(
949                    store.get(5).await.unwrap().unwrap(),
950                    FixedBytes::new([44u8; 32])
951                );
952            }
953        });
954    }
955
956    fn test_operations_and_restart(num_values: usize) -> String {
957        // Initialize the deterministic context
958        let executor = deterministic::Runner::default();
959        executor.start(|mut context| async move {
960            let cfg = Config {
961                partition: "test_ordinal".into(),
962                items_per_blob: NZU64!(100), // Smaller blobs to test multiple blob handling
963                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
964                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
965            };
966
967            // Initialize the store
968            let mut store =
969                Ordinal::<_, FixedBytes<128>>::init(context.with_label("first"), cfg.clone())
970                    .await
971                    .expect("Failed to initialize store");
972
973            // Generate and insert random values at various indices
974            let mut values = Vec::new();
975            let mut rng_index = 0u64;
976
977            for _ in 0..num_values {
978                // Generate a pseudo-random index (sparse to test gaps)
979                let mut index_bytes = [0u8; 8];
980                context.fill_bytes(&mut index_bytes);
981                let index_offset = u64::from_be_bytes(index_bytes) % 1000;
982                let index = rng_index + index_offset;
983                rng_index = index + 1;
984
985                // Generate random value
986                let mut value = [0u8; 128];
987                context.fill_bytes(&mut value);
988                let value = FixedBytes::<128>::new(value);
989
990                store
991                    .put(index, value.clone())
992                    .await
993                    .expect("Failed to put data");
994                values.push((index, value));
995            }
996
997            // Sync data
998            store.sync().await.expect("Failed to sync");
999
1000            // Verify all values can be retrieved
1001            for (index, value) in &values {
1002                let retrieved = store
1003                    .get(*index)
1004                    .await
1005                    .expect("Failed to get data")
1006                    .expect("Data not found");
1007                assert_eq!(&retrieved, value);
1008            }
1009
1010            // Test next_gap on various indices
1011            for i in 0..10 {
1012                let _ = store.next_gap(i * 100);
1013            }
1014
1015            // Sync and drop the store
1016            store.sync().await.expect("Failed to sync store");
1017            drop(store);
1018
1019            // Reopen the store
1020            let mut store = Ordinal::<_, FixedBytes<128>>::init(context.with_label("second"), cfg)
1021                .await
1022                .expect("Failed to initialize store");
1023
1024            // Verify all values are still there after restart
1025            for (index, value) in &values {
1026                let retrieved = store
1027                    .get(*index)
1028                    .await
1029                    .expect("Failed to get data")
1030                    .expect("Data not found");
1031                assert_eq!(&retrieved, value);
1032            }
1033
1034            // Add more values after restart
1035            for _ in 0..10 {
1036                let mut index_bytes = [0u8; 8];
1037                context.fill_bytes(&mut index_bytes);
1038                let index = u64::from_be_bytes(index_bytes) % 10000;
1039
1040                let mut value = [0u8; 128];
1041                context.fill_bytes(&mut value);
1042                let value = FixedBytes::<128>::new(value);
1043
1044                store.put(index, value).await.expect("Failed to put data");
1045            }
1046
1047            // Final sync
1048            store.sync().await.expect("Failed to sync");
1049
1050            // Return the auditor state for comparison
1051            context.auditor().state()
1052        })
1053    }
1054
1055    #[test_group("slow")]
1056    #[test_traced]
1057    fn test_determinism() {
1058        let state1 = test_operations_and_restart(100);
1059        let state2 = test_operations_and_restart(100);
1060        assert_eq!(state1, state2);
1061    }
1062
1063    #[test_traced]
1064    fn test_prune_basic() {
1065        // Initialize the deterministic context
1066        let executor = deterministic::Runner::default();
1067        executor.start(|context| async move {
1068            let cfg = Config {
1069                partition: "test_ordinal".into(),
1070                items_per_blob: NZU64!(100), // Small blobs to test multiple blob handling
1071                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1072                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1073            };
1074
1075            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1076                .await
1077                .expect("Failed to initialize store");
1078
1079            // Insert data across multiple blobs
1080            let values = vec![
1081                (0u64, FixedBytes::new([0u8; 32])),     // Blob 0
1082                (50u64, FixedBytes::new([50u8; 32])),   // Blob 0
1083                (100u64, FixedBytes::new([100u8; 32])), // Blob 1
1084                (150u64, FixedBytes::new([150u8; 32])), // Blob 1
1085                (200u64, FixedBytes::new([200u8; 32])), // Blob 2
1086                (300u64, FixedBytes::new([44u8; 32])),  // Blob 3
1087            ];
1088
1089            for (index, value) in &values {
1090                store
1091                    .put(*index, value.clone())
1092                    .await
1093                    .expect("Failed to put data");
1094            }
1095            store.sync().await.unwrap();
1096
1097            // Verify all values exist
1098            for (index, value) in &values {
1099                assert_eq!(store.get(*index).await.unwrap().unwrap(), *value);
1100            }
1101
1102            // Prune up to index 150 (should remove blob 0 only)
1103            store.prune(150).await.unwrap();
1104            let buffer = context.encode();
1105            assert!(buffer.contains("pruned_total 1"));
1106
1107            // Verify pruned data is gone
1108            assert!(!store.has(0));
1109            assert!(!store.has(50));
1110            assert!(store.get(0).await.unwrap().is_none());
1111            assert!(store.get(50).await.unwrap().is_none());
1112
1113            // Verify remaining data is still there
1114            assert!(store.has(100));
1115            assert!(store.has(150));
1116            assert!(store.has(200));
1117            assert!(store.has(300));
1118            assert_eq!(store.get(100).await.unwrap().unwrap(), values[2].1);
1119            assert_eq!(store.get(150).await.unwrap().unwrap(), values[3].1);
1120            assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1121            assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1122
1123            // Prune more aggressively - up to index 250 (should remove blob 1)
1124            store.prune(250).await.unwrap();
1125            let buffer = context.encode();
1126            assert!(buffer.contains("pruned_total 2"));
1127
1128            // Verify more data is pruned
1129            assert!(!store.has(100));
1130            assert!(!store.has(150));
1131            assert!(store.get(100).await.unwrap().is_none());
1132            assert!(store.get(150).await.unwrap().is_none());
1133
1134            // Verify remaining data
1135            assert!(store.has(200));
1136            assert!(store.has(300));
1137            assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1138            assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1139        });
1140    }
1141
1142    #[test_traced]
1143    fn test_prune_with_gaps() {
1144        // Initialize the deterministic context
1145        let executor = deterministic::Runner::default();
1146        executor.start(|context| async move {
1147            let cfg = Config {
1148                partition: "test_ordinal".into(),
1149                items_per_blob: NZU64!(100),
1150                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1151                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1152            };
1153
1154            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1155                .await
1156                .expect("Failed to initialize store");
1157
1158            // Insert sparse data with gaps
1159            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1160            store.put(105, FixedBytes::new([105u8; 32])).await.unwrap();
1161            store.put(305, FixedBytes::new([49u8; 32])).await.unwrap();
1162            store.sync().await.unwrap();
1163
1164            // Check gaps before pruning
1165            let (current_end, next_start) = store.next_gap(0);
1166            assert!(current_end.is_none());
1167            assert_eq!(next_start, Some(5));
1168
1169            let (current_end, next_start) = store.next_gap(5);
1170            assert_eq!(current_end, Some(5));
1171            assert_eq!(next_start, Some(105));
1172
1173            // Prune up to index 150 (should remove blob 0)
1174            store.prune(150).await.unwrap();
1175
1176            // Verify pruned data is gone
1177            assert!(!store.has(5));
1178            assert!(store.get(5).await.unwrap().is_none());
1179
1180            // Verify remaining data and gaps
1181            assert!(store.has(105));
1182            assert!(store.has(305));
1183
1184            let (current_end, next_start) = store.next_gap(0);
1185            assert!(current_end.is_none());
1186            assert_eq!(next_start, Some(105));
1187
1188            let (current_end, next_start) = store.next_gap(105);
1189            assert_eq!(current_end, Some(105));
1190            assert_eq!(next_start, Some(305));
1191        });
1192    }
1193
1194    #[test_traced]
1195    fn test_prune_no_op() {
1196        // Initialize the deterministic context
1197        let executor = deterministic::Runner::default();
1198        executor.start(|context| async move {
1199            let cfg = Config {
1200                partition: "test_ordinal".into(),
1201                items_per_blob: NZU64!(100),
1202                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1203                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1204            };
1205
1206            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1207                .await
1208                .expect("Failed to initialize store");
1209
1210            // Insert data
1211            store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1212            store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1213            store.sync().await.unwrap();
1214
1215            // Try to prune before any data - should be no-op
1216            store.prune(50).await.unwrap();
1217
1218            // Verify no data was actually pruned
1219            assert!(store.has(100));
1220            assert!(store.has(200));
1221            let buffer = context.encode();
1222            assert!(buffer.contains("pruned_total 0"));
1223
1224            // Try to prune exactly at blob boundary - should be no-op
1225            store.prune(100).await.unwrap();
1226
1227            // Verify still no data pruned
1228            assert!(store.has(100));
1229            assert!(store.has(200));
1230            let buffer = context.encode();
1231            assert!(buffer.contains("pruned_total 0"));
1232        });
1233    }
1234
1235    #[test_traced]
1236    fn test_prune_empty_store() {
1237        // Initialize the deterministic context
1238        let executor = deterministic::Runner::default();
1239        executor.start(|context| async move {
1240            let cfg = Config {
1241                partition: "test_ordinal".into(),
1242                items_per_blob: NZU64!(100),
1243                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1244                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1245            };
1246
1247            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1248                .await
1249                .expect("Failed to initialize store");
1250
1251            // Try to prune empty store
1252            store.prune(1000).await.unwrap();
1253
1254            // Store should still be functional
1255            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1256            assert!(store.has(0));
1257        });
1258    }
1259
1260    #[test_traced]
1261    fn test_prune_after_restart() {
1262        // Initialize the deterministic context
1263        let executor = deterministic::Runner::default();
1264        executor.start(|context| async move {
1265            let cfg = Config {
1266                partition: "test_ordinal".into(),
1267                items_per_blob: NZU64!(100),
1268                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1269                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1270            };
1271
1272            // Create store and add data
1273            {
1274                let mut store =
1275                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1276                        .await
1277                        .expect("Failed to initialize store");
1278
1279                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1280                store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1281                store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1282                store.sync().await.unwrap();
1283            }
1284
1285            // Reopen and prune
1286            {
1287                let mut store =
1288                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
1289                        .await
1290                        .expect("Failed to initialize store");
1291
1292                // Verify data is there
1293                assert!(store.has(0));
1294                assert!(store.has(100));
1295                assert!(store.has(200));
1296
1297                // Prune up to index 150
1298                store.prune(150).await.unwrap();
1299
1300                // Verify pruning worked
1301                assert!(!store.has(0));
1302                assert!(store.has(100));
1303                assert!(store.has(200));
1304
1305                store.sync().await.unwrap();
1306            }
1307
1308            // Reopen again and verify pruning persisted
1309            {
1310                let store =
1311                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("third"), cfg.clone())
1312                        .await
1313                        .expect("Failed to initialize store");
1314
1315                assert!(!store.has(0));
1316                assert!(store.has(100));
1317                assert!(store.has(200));
1318
1319                // Check gaps
1320                let (current_end, next_start) = store.next_gap(0);
1321                assert!(current_end.is_none());
1322                assert_eq!(next_start, Some(100));
1323            }
1324        });
1325    }
1326
1327    #[test_traced]
1328    fn test_prune_multiple_operations() {
1329        // Initialize the deterministic context
1330        let executor = deterministic::Runner::default();
1331        executor.start(|context| async move {
1332            let cfg = Config {
1333                partition: "test_ordinal".into(),
1334                items_per_blob: NZU64!(50), // Smaller blobs for more granular testing
1335                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1336                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1337            };
1338
1339            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1340                .await
1341                .expect("Failed to initialize store");
1342
1343            // Insert data across many blobs
1344            let mut values = Vec::new();
1345            for i in 0..10 {
1346                let index = i * 50 + 25; // Middle of each blob
1347                let value = FixedBytes::new([i as u8; 32]);
1348                store.put(index, value.clone()).await.unwrap();
1349                values.push((index, value));
1350            }
1351            store.sync().await.unwrap();
1352
1353            // Prune incrementally
1354            for i in 1..5 {
1355                let prune_index = i * 50 + 10;
1356                store.prune(prune_index).await.unwrap();
1357
1358                // Verify appropriate data is pruned
1359                for (index, _) in &values {
1360                    if *index < prune_index {
1361                        assert!(!store.has(*index), "Index {index} should be pruned");
1362                    } else {
1363                        assert!(store.has(*index), "Index {index} should not be pruned");
1364                    }
1365                }
1366            }
1367
1368            // Check final state
1369            let buffer = context.encode();
1370            assert!(buffer.contains("pruned_total 4"));
1371
1372            // Verify remaining data
1373            for i in 4..10 {
1374                let index = i * 50 + 25;
1375                assert!(store.has(index));
1376                assert_eq!(
1377                    store.get(index).await.unwrap().unwrap(),
1378                    values[i as usize].1
1379                );
1380            }
1381        });
1382    }
1383
1384    #[test_traced]
1385    fn test_prune_blob_boundaries() {
1386        // Initialize the deterministic context
1387        let executor = deterministic::Runner::default();
1388        executor.start(|context| async move {
1389            let cfg = Config {
1390                partition: "test_ordinal".into(),
1391                items_per_blob: NZU64!(100),
1392                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1393                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1394            };
1395
1396            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1397                .await
1398                .expect("Failed to initialize store");
1399
1400            // Insert data at blob boundaries
1401            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); // Start of blob 0
1402            store.put(99, FixedBytes::new([99u8; 32])).await.unwrap(); // End of blob 0
1403            store.put(100, FixedBytes::new([100u8; 32])).await.unwrap(); // Start of blob 1
1404            store.put(199, FixedBytes::new([199u8; 32])).await.unwrap(); // End of blob 1
1405            store.put(200, FixedBytes::new([200u8; 32])).await.unwrap(); // Start of blob 2
1406            store.sync().await.unwrap();
1407
1408            // Test various pruning points around boundaries
1409
1410            // Prune exactly at blob boundary (100) - should prune blob 0
1411            store.prune(100).await.unwrap();
1412            assert!(!store.has(0));
1413            assert!(!store.has(99));
1414            assert!(store.has(100));
1415            assert!(store.has(199));
1416            assert!(store.has(200));
1417
1418            // Prune just before next boundary (199) - should not prune blob 1
1419            store.prune(199).await.unwrap();
1420            assert!(store.has(100));
1421            assert!(store.has(199));
1422            assert!(store.has(200));
1423
1424            // Prune exactly at next boundary (200) - should prune blob 1
1425            store.prune(200).await.unwrap();
1426            assert!(!store.has(100));
1427            assert!(!store.has(199));
1428            assert!(store.has(200));
1429
1430            let buffer = context.encode();
1431            assert!(buffer.contains("pruned_total 2"));
1432        });
1433    }
1434
1435    #[test_traced]
1436    fn test_prune_non_contiguous_sections() {
1437        // Initialize the deterministic context
1438        let executor = deterministic::Runner::default();
1439        executor.start(|context| async move {
1440            let cfg = Config {
1441                partition: "test_ordinal".into(),
1442                items_per_blob: NZU64!(100),
1443                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1444                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1445            };
1446
1447            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1448                .await
1449                .expect("Failed to initialize store");
1450
1451            // Insert data in non-contiguous sections (0, 2, 5, 7)
1452            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); // Section 0
1453            store.put(250, FixedBytes::new([50u8; 32])).await.unwrap(); // Section 2 (250/100 = 2)
1454            store.put(500, FixedBytes::new([44u8; 32])).await.unwrap(); // Section 5 (500/100 = 5)
1455            store.put(750, FixedBytes::new([45u8; 32])).await.unwrap(); // Section 7 (750/100 = 7)
1456            store.sync().await.unwrap();
1457
1458            // Verify all data exists initially
1459            assert!(store.has(0));
1460            assert!(store.has(250));
1461            assert!(store.has(500));
1462            assert!(store.has(750));
1463
1464            // Prune up to section 3 (index 300) - should remove sections 0 and 2
1465            store.prune(300).await.unwrap();
1466
1467            // Verify correct data was pruned
1468            assert!(!store.has(0)); // Section 0 pruned
1469            assert!(!store.has(250)); // Section 2 pruned
1470            assert!(store.has(500)); // Section 5 remains
1471            assert!(store.has(750)); // Section 7 remains
1472
1473            let buffer = context.encode();
1474            assert!(buffer.contains("pruned_total 2"));
1475
1476            // Prune up to section 6 (index 600) - should remove section 5
1477            store.prune(600).await.unwrap();
1478
1479            // Verify section 5 was pruned
1480            assert!(!store.has(500)); // Section 5 pruned
1481            assert!(store.has(750)); // Section 7 remains
1482
1483            let buffer = context.encode();
1484            assert!(buffer.contains("pruned_total 3"));
1485
1486            // Prune everything - should remove section 7
1487            store.prune(1000).await.unwrap();
1488
1489            // Verify all data is gone
1490            assert!(!store.has(750)); // Section 7 pruned
1491
1492            let buffer = context.encode();
1493            assert!(buffer.contains("pruned_total 4"));
1494        });
1495    }
1496
1497    #[test_traced]
1498    fn test_prune_removes_correct_pending() {
1499        // Initialize the deterministic context
1500        let executor = deterministic::Runner::default();
1501        executor.start(|context| async move {
1502            let cfg = Config {
1503                partition: "test_ordinal".into(),
1504                items_per_blob: NZU64!(100),
1505                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1506                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1507            };
1508            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1509                .await
1510                .expect("Failed to initialize store");
1511
1512            // Insert and sync some data in blob 0
1513            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1514            store.sync().await.unwrap();
1515
1516            // Add pending entries to blob 0 and blob 1
1517            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap(); // blob 0
1518            store.put(110, FixedBytes::new([110u8; 32])).await.unwrap(); // blob 1
1519
1520            // Verify all data is visible before pruning
1521            assert!(store.has(5));
1522            assert!(store.has(10));
1523            assert!(store.has(110));
1524
1525            // Prune up to index 100, which should remove blob 0 (indices 0-99).
1526            store.prune(150).await.unwrap();
1527
1528            // Verify that synced and pending entries in blob 0 are removed.
1529            assert!(!store.has(5));
1530            assert!(!store.has(10));
1531
1532            // Verify that the pending entry in blob 1 remains.
1533            assert!(store.has(110));
1534            assert_eq!(
1535                store.get(110).await.unwrap().unwrap(),
1536                FixedBytes::new([110u8; 32])
1537            );
1538
1539            // Sync the remaining pending entry and verify it's still there.
1540            store.sync().await.unwrap();
1541            assert!(store.has(110));
1542            assert_eq!(
1543                store.get(110).await.unwrap().unwrap(),
1544                FixedBytes::new([110u8; 32])
1545            );
1546        });
1547    }
1548
1549    #[test_traced]
1550    fn test_init_with_bits_none() {
1551        // Initialize the deterministic context
1552        let executor = deterministic::Runner::default();
1553        executor.start(|context| async move {
1554            let cfg = Config {
1555                partition: "test_ordinal".into(),
1556                items_per_blob: NZU64!(10), // Small blob size for testing
1557                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1558                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1559            };
1560
1561            // Create store with data across multiple sections
1562            {
1563                let mut store =
1564                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1565                        .await
1566                        .expect("Failed to initialize store");
1567
1568                // Section 0 (indices 0-9)
1569                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1570                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1571                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1572
1573                // Section 1 (indices 10-19)
1574                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1575                store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
1576
1577                // Section 2 (indices 20-29)
1578                store.put(25, FixedBytes::new([25u8; 32])).await.unwrap();
1579
1580                store.sync().await.unwrap();
1581            }
1582
1583            // Reinitialize with bits = None (should behave like regular init)
1584            {
1585                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1586                    context.with_label("second"),
1587                    cfg.clone(),
1588                    None,
1589                )
1590                .await
1591                .expect("Failed to initialize store with bits");
1592
1593                // All records should be available
1594                assert!(store.has(0));
1595                assert!(store.has(5));
1596                assert!(store.has(9));
1597                assert!(store.has(10));
1598                assert!(store.has(15));
1599                assert!(store.has(25));
1600
1601                // Non-existent records should not be available
1602                assert!(!store.has(1));
1603                assert!(!store.has(11));
1604                assert!(!store.has(20));
1605
1606                // Verify values
1607                assert_eq!(
1608                    store.get(0).await.unwrap().unwrap(),
1609                    FixedBytes::new([0u8; 32])
1610                );
1611                assert_eq!(
1612                    store.get(15).await.unwrap().unwrap(),
1613                    FixedBytes::new([15u8; 32])
1614                );
1615            }
1616        });
1617    }
1618
1619    #[test_traced]
1620    fn test_init_with_bits_empty_hashmap() {
1621        // Initialize the deterministic context
1622        let executor = deterministic::Runner::default();
1623        executor.start(|context| async move {
1624            let cfg = Config {
1625                partition: "test_ordinal".into(),
1626                items_per_blob: NZU64!(10),
1627                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1628                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1629            };
1630
1631            // Create store with data
1632            {
1633                let mut store =
1634                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1635                        .await
1636                        .expect("Failed to initialize store");
1637
1638                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1639                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1640                store.put(20, FixedBytes::new([20u8; 32])).await.unwrap();
1641
1642                store.sync().await.unwrap();
1643            }
1644
1645            // Reinitialize with empty HashMap - should skip all sections
1646            {
1647                let bits: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1648                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1649                    context.with_label("second"),
1650                    cfg.clone(),
1651                    Some(bits),
1652                )
1653                .await
1654                .expect("Failed to initialize store with bits");
1655
1656                // No records should be available since no sections were in the bits map
1657                assert!(!store.has(0));
1658                assert!(!store.has(10));
1659                assert!(!store.has(20));
1660            }
1661        });
1662    }
1663
1664    #[test_traced]
1665    fn test_init_with_bits_selective_sections() {
1666        // Initialize the deterministic context
1667        let executor = deterministic::Runner::default();
1668        executor.start(|context| async move {
1669            let cfg = Config {
1670                partition: "test_ordinal".into(),
1671                items_per_blob: NZU64!(10),
1672                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1673                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1674            };
1675
1676            // Create store with data in multiple sections
1677            {
1678                let mut store =
1679                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1680                        .await
1681                        .expect("Failed to initialize store");
1682
1683                // Section 0 (indices 0-9)
1684                for i in 0..10 {
1685                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1686                }
1687
1688                // Section 1 (indices 10-19)
1689                for i in 10..20 {
1690                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1691                }
1692
1693                // Section 2 (indices 20-29)
1694                for i in 20..30 {
1695                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1696                }
1697
1698                store.sync().await.unwrap();
1699            }
1700
1701            // Reinitialize with bits for only section 1
1702            {
1703                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1704
1705                // Create a BitMap that marks indices 12, 15, and 18 as present
1706                let mut bitmap = BitMap::zeroes(10);
1707                bitmap.set(2, true); // Index 12 (offset 2 in section 1)
1708                bitmap.set(5, true); // Index 15 (offset 5 in section 1)
1709                bitmap.set(8, true); // Index 18 (offset 8 in section 1)
1710                let bitmap_option = Some(bitmap);
1711
1712                bits_map.insert(1, &bitmap_option);
1713
1714                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1715                    context.with_label("second"),
1716                    cfg.clone(),
1717                    Some(bits_map),
1718                )
1719                .await
1720                .expect("Failed to initialize store with bits");
1721
1722                // Only specified indices from section 1 should be available
1723                assert!(store.has(12));
1724                assert!(store.has(15));
1725                assert!(store.has(18));
1726
1727                // Other indices from section 1 should not be available
1728                assert!(!store.has(10));
1729                assert!(!store.has(11));
1730                assert!(!store.has(13));
1731                assert!(!store.has(14));
1732                assert!(!store.has(16));
1733                assert!(!store.has(17));
1734                assert!(!store.has(19));
1735
1736                // All indices from sections 0 and 2 should not be available
1737                for i in 0..10 {
1738                    assert!(!store.has(i));
1739                }
1740                for i in 20..30 {
1741                    assert!(!store.has(i));
1742                }
1743
1744                // Verify the available values
1745                assert_eq!(
1746                    store.get(12).await.unwrap().unwrap(),
1747                    FixedBytes::new([12u8; 32])
1748                );
1749                assert_eq!(
1750                    store.get(15).await.unwrap().unwrap(),
1751                    FixedBytes::new([15u8; 32])
1752                );
1753                assert_eq!(
1754                    store.get(18).await.unwrap().unwrap(),
1755                    FixedBytes::new([18u8; 32])
1756                );
1757            }
1758        });
1759    }
1760
1761    #[test_traced]
1762    fn test_init_with_bits_none_option_all_records_exist() {
1763        // Initialize the deterministic context
1764        let executor = deterministic::Runner::default();
1765        executor.start(|context| async move {
1766            let cfg = Config {
1767                partition: "test_ordinal".into(),
1768                items_per_blob: NZU64!(5),
1769                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1770                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1771            };
1772
1773            // Create store with all records in a section
1774            {
1775                let mut store =
1776                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1777                        .await
1778                        .expect("Failed to initialize store");
1779
1780                // Fill section 1 completely (indices 5-9)
1781                for i in 5..10 {
1782                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1783                }
1784
1785                store.sync().await.unwrap();
1786            }
1787
1788            // Reinitialize with None option for section 1 (expects all records)
1789            {
1790                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1791                let none_option: Option<BitMap> = None;
1792                bits_map.insert(1, &none_option);
1793
1794                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1795                    context.with_label("second"),
1796                    cfg.clone(),
1797                    Some(bits_map),
1798                )
1799                .await
1800                .expect("Failed to initialize store with bits");
1801
1802                // All records in section 1 should be available
1803                for i in 5..10 {
1804                    assert!(store.has(i));
1805                    assert_eq!(
1806                        store.get(i).await.unwrap().unwrap(),
1807                        FixedBytes::new([i as u8; 32])
1808                    );
1809                }
1810            }
1811        });
1812    }
1813
1814    #[test_traced]
1815    #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(6)")]
1816    fn test_init_with_bits_none_option_missing_record_panics() {
1817        // Initialize the deterministic context
1818        let executor = deterministic::Runner::default();
1819        executor.start(|context| async move {
1820            let cfg = Config {
1821                partition: "test_ordinal".into(),
1822                items_per_blob: NZU64!(5),
1823                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1824                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1825            };
1826
1827            // Create store with missing record in a section
1828            {
1829                let mut store =
1830                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1831                        .await
1832                        .expect("Failed to initialize store");
1833
1834                // Fill section 1 partially (skip index 6)
1835                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1836                // Skip index 6
1837                store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1838                store.put(8, FixedBytes::new([8u8; 32])).await.unwrap();
1839                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1840
1841                store.sync().await.unwrap();
1842            }
1843
1844            // Reinitialize with None option for section 1 (expects all records)
1845            // This should panic because index 6 is missing
1846            {
1847                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1848                let none_option: Option<BitMap> = None;
1849                bits_map.insert(1, &none_option);
1850
1851                let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1852                    context.with_label("second"),
1853                    cfg.clone(),
1854                    Some(bits_map),
1855                )
1856                .await
1857                .expect("Failed to initialize store with bits");
1858            }
1859        });
1860    }
1861
1862    #[test_traced]
1863    fn test_init_with_bits_mixed_sections() {
1864        // Initialize the deterministic context
1865        let executor = deterministic::Runner::default();
1866        executor.start(|context| async move {
1867            let cfg = Config {
1868                partition: "test_ordinal".into(),
1869                items_per_blob: NZU64!(5),
1870                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1871                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1872            };
1873
1874            // Create store with data in multiple sections
1875            {
1876                let mut store =
1877                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1878                        .await
1879                        .expect("Failed to initialize store");
1880
1881                // Section 0: indices 0-4 (fill completely)
1882                for i in 0..5 {
1883                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1884                }
1885
1886                // Section 1: indices 5-9 (fill partially)
1887                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1888                store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1889                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1890
1891                // Section 2: indices 10-14 (fill completely)
1892                for i in 10..15 {
1893                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1894                }
1895
1896                store.sync().await.unwrap();
1897            }
1898
1899            // Reinitialize with mixed bits configuration
1900            {
1901                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1902
1903                // Section 0: None option (expects all records)
1904                let none_option: Option<BitMap> = None;
1905                bits_map.insert(0, &none_option);
1906
1907                // Section 1: BitMap with specific indices
1908                let mut bitmap1 = BitMap::zeroes(5);
1909                bitmap1.set(0, true); // Index 5
1910                bitmap1.set(2, true); // Index 7
1911                                      // Note: not setting bit for index 9, so it should be ignored
1912                let bitmap1_option = Some(bitmap1);
1913                bits_map.insert(1, &bitmap1_option);
1914
1915                // Section 2: Not in map, should be skipped entirely
1916
1917                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1918                    context.with_label("second"),
1919                    cfg.clone(),
1920                    Some(bits_map),
1921                )
1922                .await
1923                .expect("Failed to initialize store with bits");
1924
1925                // All records from section 0 should be available
1926                for i in 0..5 {
1927                    assert!(store.has(i));
1928                    assert_eq!(
1929                        store.get(i).await.unwrap().unwrap(),
1930                        FixedBytes::new([i as u8; 32])
1931                    );
1932                }
1933
1934                // Only specified records from section 1 should be available
1935                assert!(store.has(5));
1936                assert!(store.has(7));
1937                assert!(!store.has(6));
1938                assert!(!store.has(8));
1939                assert!(!store.has(9)); // Not set in bitmap
1940
1941                // No records from section 2 should be available
1942                for i in 10..15 {
1943                    assert!(!store.has(i));
1944                }
1945            }
1946        });
1947    }
1948
1949    #[test_traced]
1950    #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(2)")]
1951    fn test_init_with_bits_corrupted_records() {
1952        // Initialize the deterministic context
1953        let executor = deterministic::Runner::default();
1954        executor.start(|context| async move {
1955            let cfg = Config {
1956                partition: "test_ordinal".into(),
1957                items_per_blob: NZU64!(5),
1958                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1959                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1960            };
1961
1962            // Create store with data and corrupt one record
1963            {
1964                let mut store =
1965                    Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1966                        .await
1967                        .expect("Failed to initialize store");
1968
1969                // Section 0: indices 0-4
1970                for i in 0..5 {
1971                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1972                }
1973
1974                store.sync().await.unwrap();
1975            }
1976
1977            // Corrupt record at index 2
1978            {
1979                let (blob, _) = context
1980                    .open("test_ordinal", &0u64.to_be_bytes())
1981                    .await
1982                    .unwrap();
1983                // Corrupt the CRC of record at index 2
1984                let offset = 2 * 36 + 32; // 2 * record_size + value_size
1985                blob.write_at(offset, vec![0xFF]).await.unwrap();
1986                blob.sync().await.unwrap();
1987            }
1988
1989            // Reinitialize with bits that include the corrupted record
1990            {
1991                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1992
1993                // Create a BitMap that includes the corrupted record
1994                let mut bitmap = BitMap::zeroes(5);
1995                bitmap.set(0, true); // Index 0
1996                bitmap.set(2, true); // Index 2 (corrupted) - this will cause a panic
1997                bitmap.set(4, true); // Index 4
1998                let bitmap_option = Some(bitmap);
1999                bits_map.insert(0, &bitmap_option);
2000
2001                let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
2002                    context.with_label("second"),
2003                    cfg.clone(),
2004                    Some(bits_map),
2005                )
2006                .await
2007                .expect("Failed to initialize store with bits");
2008            }
2009        });
2010    }
2011
2012    /// A dummy value that will fail parsing if the value is 0.
2013    #[derive(Debug, PartialEq, Eq)]
2014    pub struct DummyValue {
2015        pub value: u64,
2016    }
2017
2018    impl Write for DummyValue {
2019        fn write(&self, buf: &mut impl BufMut) {
2020            self.value.write(buf);
2021        }
2022    }
2023
2024    impl Read for DummyValue {
2025        type Cfg = ();
2026
2027        fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
2028            let value = u64::read(buf)?;
2029            if value == 0 {
2030                return Err(commonware_codec::Error::Invalid(
2031                    "DummyValue",
2032                    "value must be non-zero",
2033                ));
2034            }
2035            Ok(Self { value })
2036        }
2037    }
2038
2039    impl FixedSize for DummyValue {
2040        const SIZE: usize = u64::SIZE;
2041    }
2042
2043    #[test_traced]
2044    fn test_init_skip_unparseable_record() {
2045        // Initialize the deterministic context
2046        let executor = deterministic::Runner::default();
2047        executor.start(|context| async move {
2048            let cfg = Config {
2049                partition: "test_ordinal".into(),
2050                items_per_blob: NZU64!(1),
2051                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
2052                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
2053            };
2054
2055            // Create store with valid records
2056            {
2057                let mut store =
2058                    Ordinal::<_, DummyValue>::init(context.with_label("first"), cfg.clone())
2059                        .await
2060                        .expect("Failed to initialize store");
2061
2062                // Add records at indices 1, 2, 4
2063                store.put(1, DummyValue { value: 1 }).await.unwrap();
2064                store.put(2, DummyValue { value: 0 }).await.unwrap(); // will fail parsing
2065                store.put(4, DummyValue { value: 4 }).await.unwrap();
2066
2067                store.sync().await.unwrap();
2068            }
2069
2070            // Reinitialize - should skip the unparseable record but continue processing
2071            {
2072                let store =
2073                    Ordinal::<_, DummyValue>::init(context.with_label("second"), cfg.clone())
2074                        .await
2075                        .expect("Failed to initialize store");
2076
2077                // Record 0 should be available
2078                assert!(store.has(1), "Record 1 should be available");
2079                assert_eq!(
2080                    store.get(1).await.unwrap().unwrap(),
2081                    DummyValue { value: 1 },
2082                    "Record 0 should have correct value"
2083                );
2084
2085                // Record 2 should NOT be available (unparseable)
2086                assert!(
2087                    !store.has(2),
2088                    "Record 2 should not be available (unparseable)"
2089                );
2090
2091                // This tests that we didn't exit early when encountering the unparseable record
2092                assert!(
2093                    store.has(4),
2094                    "Record 4 should be available - we should not exit early on unparseable record"
2095                );
2096                assert_eq!(
2097                    store.get(4).await.unwrap().unwrap(),
2098                    DummyValue { value: 4 },
2099                    "Record 4 should have correct value"
2100                );
2101            }
2102        });
2103    }
2104}