commonware_storage/ordinal/
mod.rs

1//! A persistent index that maps sparse indices to [commonware_utils::Array]s.
2//!
3//! [Ordinal] is a collection of [commonware_runtime::Blob]s containing ordered records of fixed size.
4//! Because records are fixed size, file position corresponds directly to index. Unlike
5//! [crate::journal::contiguous::fixed::Journal], [Ordinal] supports out-of-order insertion.
6//!
7//! # Design
8//!
9//! [Ordinal] is a collection of [commonware_runtime::Blob]s where:
10//! - Each record: `[V][crc32(V)]` where V is an [commonware_utils::Array]
11//! - Index N is at file offset: `N * RECORD_SIZE`
12//! - A [crate::rmap::RMap] tracks which indices have been written (and which are missing)
13//!
14//! # File Organization
15//!
16//! Records are grouped into blobs to avoid having too many files:
17//!
18//! ```text
19//! Blob 0: indices 0-999
20//! Blob 1: indices 1000-1999
21//! ...
22//! ```
23//!
24//! # Format
25//!
26//! [Ordinal] stores values in the following format:
27//!
28//! ```text
29//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
30//! | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |
31//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
32//! |          Value (Fixed Size)       |     CRC32     |
33//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
34//! ```
35//!
36//! # Performance Characteristics
37//!
38//! - **Writes**: O(1) - direct offset calculation
39//! - **Reads**: O(1) - direct offset calculation
40//! - **Has**: O(1) - in-memory lookup (via [crate::rmap::RMap])
41//! - **Next Gap**: O(log n) - in-memory range query (via [crate::rmap::RMap])
42//! - **Restart**: O(n) where n is the number of existing records (to rebuild [crate::rmap::RMap])
43//!
44//! # Atomicity
45//!
46//! [Ordinal] eagerly writes all new data to [commonware_runtime::Blob]s. New data, however, is not
47//! synced until [Ordinal::sync] is called. As a result, data is not guaranteed to be atomically
48//! persisted (i.e. shutdown before [Ordinal::sync] may lead to some writes being lost).
49//!
50//! _If you want atomicity for sparse writes, pair [commonware_utils::bitmap::BitMap] and
51//! [crate::metadata::Metadata] with [Ordinal] (use bits to indicate which items have been atomically
52//! written)._
53//!
54//! # Recovery
55//!
56//! On restart, [Ordinal] validates all records using their CRC32 and rebuilds the in-memory
57//! [crate::rmap::RMap]. Invalid records (corrupted or empty) are excluded from the rebuilt index.
58//!
59//! # Example
60//!
61//! ```rust
62//! use commonware_runtime::{Spawner, Runner, deterministic};
63//! use commonware_storage::ordinal::{Ordinal, Config};
64//! use commonware_utils::{sequence::FixedBytes, NZUsize, NZU64};
65//!
66//! let executor = deterministic::Runner::default();
67//! executor.start(|context| async move {
68//!     // Create a store for 32-byte values
69//!     let cfg = Config {
70//!         partition: "ordinal_store".into(),
71//!         items_per_blob: NZU64!(10000),
72//!         write_buffer: NZUsize!(4096),
73//!         replay_buffer: NZUsize!(1024 * 1024),
74//!     };
75//!     let mut store = Ordinal::<_, FixedBytes<32>>::init(context, cfg).await.unwrap();
76//!
77//!     // Put values at specific indices
78//!     let value1 = FixedBytes::new([1u8; 32]);
79//!     let value2 = FixedBytes::new([2u8; 32]);
80//!     store.put(0, value1).await.unwrap();
81//!     store.put(5, value2).await.unwrap();
82//!
83//!     // Sync to disk
84//!     store.sync().await.unwrap();
85//!
86//!     // Check for gaps
87//!     let (current_end, next_start) = store.next_gap(0);
88//!     assert_eq!(current_end, Some(0));
89//!     assert_eq!(next_start, Some(5));
90//!
91//!     // Sync the store
92//!     store.sync().await.unwrap();
93//! });
94//! ```
95
96mod storage;
97
98use std::num::{NonZeroU64, NonZeroUsize};
99pub use storage::Ordinal;
100use thiserror::Error;
101
102/// Errors that can occur when interacting with the [Ordinal].
103#[derive(Debug, Error)]
104pub enum Error {
105    #[error("runtime error: {0}")]
106    Runtime(#[from] commonware_runtime::Error),
107    #[error("codec error: {0}")]
108    Codec(#[from] commonware_codec::Error),
109    #[error("invalid blob name: {0}")]
110    InvalidBlobName(String),
111    #[error("invalid record: {0}")]
112    InvalidRecord(u64),
113    #[error("missing record at {0}")]
114    MissingRecord(u64),
115}
116
117/// Configuration for [Ordinal] storage.
118#[derive(Clone)]
119pub struct Config {
120    /// The [commonware_runtime::Storage] partition to use for storing the index.
121    pub partition: String,
122
123    /// The maximum number of items to store in each index blob.
124    pub items_per_blob: NonZeroU64,
125
126    /// The size of the write buffer to use when writing to the index.
127    pub write_buffer: NonZeroUsize,
128
129    /// The size of the read buffer to use on restart.
130    pub replay_buffer: NonZeroUsize,
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use bytes::{Buf, BufMut};
137    use commonware_codec::{FixedSize, Read, ReadExt, Write};
138    use commonware_cryptography::Crc32;
139    use commonware_macros::{test_group, test_traced};
140    use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
141    use commonware_utils::{bitmap::BitMap, hex, sequence::FixedBytes, NZUsize, NZU64};
142    use rand::RngCore;
143    use std::collections::BTreeMap;
144
145    const DEFAULT_ITEMS_PER_BLOB: u64 = 1000;
146    const DEFAULT_WRITE_BUFFER: usize = 4096;
147    const DEFAULT_REPLAY_BUFFER: usize = 1024 * 1024;
148
149    #[test_traced]
150    fn test_put_get() {
151        // Initialize the deterministic context
152        let executor = deterministic::Runner::default();
153        executor.start(|context| async move {
154            // Initialize the store
155            let cfg = Config {
156                partition: "test_ordinal".into(),
157                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
158                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
159                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
160            };
161            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
162                .await
163                .expect("Failed to initialize store");
164
165            let value = FixedBytes::new([42u8; 32]);
166
167            // Check index doesn't exist
168            assert!(!store.has(0));
169
170            // Put the value at index 0
171            store
172                .put(0, value.clone())
173                .await
174                .expect("Failed to put data");
175
176            // Check index exists
177            assert!(store.has(0));
178
179            // Get the value back (before sync)
180            let retrieved = store
181                .get(0)
182                .await
183                .expect("Failed to get data")
184                .expect("Data not found");
185            assert_eq!(retrieved, value);
186
187            // Force a sync
188            store.sync().await.expect("Failed to sync data");
189
190            // Check metrics
191            let buffer = context.encode();
192            assert!(buffer.contains("gets_total 1"), "{}", buffer);
193            assert!(buffer.contains("puts_total 1"), "{}", buffer);
194            assert!(buffer.contains("has_total 2"), "{}", buffer);
195            assert!(buffer.contains("syncs_total 1"), "{}", buffer);
196            assert!(buffer.contains("pruned_total 0"), "{}", buffer);
197
198            // Get the value back (after sync)
199            let retrieved = store
200                .get(0)
201                .await
202                .expect("Failed to get data")
203                .expect("Data not found");
204            assert_eq!(retrieved, value);
205        });
206    }
207
208    #[test_traced]
209    fn test_multiple_indices() {
210        // Initialize the deterministic context
211        let executor = deterministic::Runner::default();
212        executor.start(|context| async move {
213            // Initialize the store
214            let cfg = Config {
215                partition: "test_ordinal".into(),
216                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
217                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
218                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
219            };
220            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
221                .await
222                .expect("Failed to initialize store");
223
224            // Insert multiple values at different indices
225            let indices = vec![
226                (0u64, FixedBytes::new([0u8; 32])),
227                (5u64, FixedBytes::new([5u8; 32])),
228                (10u64, FixedBytes::new([10u8; 32])),
229                (100u64, FixedBytes::new([100u8; 32])),
230                (1000u64, FixedBytes::new([200u8; 32])), // Different blob
231            ];
232
233            for (index, value) in &indices {
234                store
235                    .put(*index, value.clone())
236                    .await
237                    .expect("Failed to put data");
238            }
239
240            // Sync to disk
241            store.sync().await.expect("Failed to sync");
242
243            // Retrieve all values and verify
244            for (index, value) in &indices {
245                let retrieved = store
246                    .get(*index)
247                    .await
248                    .expect("Failed to get data")
249                    .expect("Data not found");
250                assert_eq!(&retrieved, value);
251            }
252        });
253    }
254
255    #[test_traced]
256    fn test_sparse_indices() {
257        // Initialize the deterministic context
258        let executor = deterministic::Runner::default();
259        executor.start(|context| async move {
260            // Initialize the store
261            let cfg = Config {
262                partition: "test_ordinal".into(),
263                items_per_blob: NZU64!(100), // Smaller blobs for testing
264                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
265                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
266            };
267            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
268                .await
269                .expect("Failed to initialize store");
270
271            // Insert sparse values
272            let indices = vec![
273                (0u64, FixedBytes::new([0u8; 32])),
274                (99u64, FixedBytes::new([99u8; 32])), // End of first blob
275                (100u64, FixedBytes::new([100u8; 32])), // Start of second blob
276                (500u64, FixedBytes::new([200u8; 32])), // Start of sixth blob
277            ];
278
279            for (index, value) in &indices {
280                store
281                    .put(*index, value.clone())
282                    .await
283                    .expect("Failed to put data");
284            }
285
286            // Check that intermediate indices don't exist
287            assert!(!store.has(1));
288            assert!(!store.has(50));
289            assert!(!store.has(101));
290            assert!(!store.has(499));
291
292            // Sync and verify
293            store.sync().await.expect("Failed to sync");
294
295            for (index, value) in &indices {
296                let retrieved = store
297                    .get(*index)
298                    .await
299                    .expect("Failed to get data")
300                    .expect("Data not found");
301                assert_eq!(&retrieved, value);
302            }
303        });
304    }
305
306    #[test_traced]
307    fn test_next_gap() {
308        // Initialize the deterministic context
309        let executor = deterministic::Runner::default();
310        executor.start(|context| async move {
311            // Initialize the store
312            let cfg = Config {
313                partition: "test_ordinal".into(),
314                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
315                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
316                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
317            };
318            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
319                .await
320                .expect("Failed to initialize store");
321
322            // Insert values with gaps
323            store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
324            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
325            store.put(11, FixedBytes::new([11u8; 32])).await.unwrap();
326            store.put(14, FixedBytes::new([14u8; 32])).await.unwrap();
327
328            // Check gaps
329            let (current_end, start_next) = store.next_gap(0);
330            assert!(current_end.is_none());
331            assert_eq!(start_next, Some(1));
332
333            let (current_end, start_next) = store.next_gap(1);
334            assert_eq!(current_end, Some(1));
335            assert_eq!(start_next, Some(10));
336
337            let (current_end, start_next) = store.next_gap(10);
338            assert_eq!(current_end, Some(11));
339            assert_eq!(start_next, Some(14));
340
341            let (current_end, start_next) = store.next_gap(11);
342            assert_eq!(current_end, Some(11));
343            assert_eq!(start_next, Some(14));
344
345            let (current_end, start_next) = store.next_gap(12);
346            assert!(current_end.is_none());
347            assert_eq!(start_next, Some(14));
348
349            let (current_end, start_next) = store.next_gap(14);
350            assert_eq!(current_end, Some(14));
351            assert!(start_next.is_none());
352        });
353    }
354
355    #[test_traced]
356    fn test_missing_items() {
357        // Initialize the deterministic context
358        let executor = deterministic::Runner::default();
359        executor.start(|context| async move {
360            // Initialize the store
361            let cfg = Config {
362                partition: "test_ordinal".into(),
363                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
364                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
365                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
366            };
367            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
368                .await
369                .expect("Failed to initialize store");
370
371            // Test 1: Empty store - should return no items
372            assert_eq!(store.missing_items(0, 5), Vec::<u64>::new());
373            assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
374
375            // Test 2: Insert values with gaps
376            store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
377            store.put(2, FixedBytes::new([2u8; 32])).await.unwrap();
378            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
379            store.put(6, FixedBytes::new([6u8; 32])).await.unwrap();
380            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
381
382            // Test 3: Find missing items from the beginning
383            assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
384            assert_eq!(store.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
385            assert_eq!(store.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
386
387            // Test 4: Find missing items from within a gap
388            assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
389            assert_eq!(store.missing_items(4, 2), vec![4, 7]);
390
391            // Test 5: Find missing items from within a range
392            assert_eq!(store.missing_items(1, 3), vec![3, 4, 7]);
393            assert_eq!(store.missing_items(2, 4), vec![3, 4, 7, 8]);
394            assert_eq!(store.missing_items(5, 2), vec![7, 8]);
395
396            // Test 6: Find missing items after the last range (no more gaps)
397            assert_eq!(store.missing_items(11, 5), Vec::<u64>::new());
398            assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
399
400            // Test 7: Large gap scenario
401            store.put(1000, FixedBytes::new([100u8; 32])).await.unwrap();
402
403            // Gap between 10 and 1000
404            let items = store.missing_items(11, 10);
405            assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
406
407            // Request more items than available in gap
408            let items = store.missing_items(990, 15);
409            assert_eq!(
410                items,
411                vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
412            );
413
414            // Test 8: After syncing (data should remain consistent)
415            store.sync().await.unwrap();
416            assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
417            assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
418
419            // Test 9: Cross-blob boundary scenario
420            store.put(9999, FixedBytes::new([99u8; 32])).await.unwrap();
421            store
422                .put(10001, FixedBytes::new([101u8; 32]))
423                .await
424                .unwrap();
425
426            // Find missing items across blob boundary (10000 is the boundary)
427            let items = store.missing_items(9998, 5);
428            assert_eq!(items, vec![9998, 10000]);
429        });
430    }
431
432    #[test_traced]
433    fn test_restart() {
434        // Initialize the deterministic context
435        let executor = deterministic::Runner::default();
436        executor.start(|context| async move {
437            let cfg = Config {
438                partition: "test_ordinal".into(),
439                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
440                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
441                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
442            };
443
444            // Insert data and close
445            {
446                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
447                    .await
448                    .expect("Failed to initialize store");
449
450                let values = vec![
451                    (0u64, FixedBytes::new([0u8; 32])),
452                    (100u64, FixedBytes::new([100u8; 32])),
453                    (1000u64, FixedBytes::new([200u8; 32])),
454                ];
455
456                for (index, value) in &values {
457                    store
458                        .put(*index, value.clone())
459                        .await
460                        .expect("Failed to put data");
461                }
462
463                store.sync().await.expect("Failed to sync store");
464            }
465
466            // Reopen and verify data persisted
467            {
468                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
469                    .await
470                    .expect("Failed to initialize store");
471
472                let values = vec![
473                    (0u64, FixedBytes::new([0u8; 32])),
474                    (100u64, FixedBytes::new([100u8; 32])),
475                    (1000u64, FixedBytes::new([200u8; 32])),
476                ];
477
478                for (index, value) in &values {
479                    let retrieved = store
480                        .get(*index)
481                        .await
482                        .expect("Failed to get data")
483                        .expect("Data not found");
484                    assert_eq!(&retrieved, value);
485                }
486
487                // Check gaps are preserved
488                let (current_end, start_next) = store.next_gap(0);
489                assert_eq!(current_end, Some(0));
490                assert_eq!(start_next, Some(100));
491            }
492        });
493    }
494
495    #[test_traced]
496    fn test_invalid_record() {
497        // Initialize the deterministic context
498        let executor = deterministic::Runner::default();
499        executor.start(|context| async move {
500            let cfg = Config {
501                partition: "test_ordinal".into(),
502                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
503                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
504                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
505            };
506
507            // Create store with data
508            {
509                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
510                    .await
511                    .expect("Failed to initialize store");
512
513                store
514                    .put(0, FixedBytes::new([42u8; 32]))
515                    .await
516                    .expect("Failed to put data");
517                store.sync().await.expect("Failed to sync store");
518            }
519
520            // Corrupt the data
521            {
522                let (blob, _) = context
523                    .open("test_ordinal", &0u64.to_be_bytes())
524                    .await
525                    .unwrap();
526                // Corrupt the CRC by changing a byte
527                blob.write_at(vec![0xFF], 32).await.unwrap();
528                blob.sync().await.unwrap();
529            }
530
531            // Reopen and try to read corrupted data
532            {
533                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
534                    .await
535                    .expect("Failed to initialize store");
536
537                // Reading corrupt record will return empty
538                let result = store.get(0).await.unwrap();
539                assert!(result.is_none());
540
541                // The index should not be in the intervals after restart with corrupted data
542                assert!(!store.has(0));
543            }
544        });
545    }
546
547    #[test_traced]
548    fn test_get_nonexistent() {
549        // Initialize the deterministic context
550        let executor = deterministic::Runner::default();
551        executor.start(|context| async move {
552            // Initialize the store
553            let cfg = Config {
554                partition: "test_ordinal".into(),
555                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
556                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
557                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
558            };
559            let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
560                .await
561                .expect("Failed to initialize store");
562
563            // Attempt to get an index that doesn't exist
564            let retrieved = store.get(999).await.expect("Failed to get data");
565            assert!(retrieved.is_none());
566
567            // Check has returns false
568            assert!(!store.has(999));
569        });
570    }
571
572    #[test_traced]
573    fn test_destroy() {
574        // Initialize the deterministic context
575        let executor = deterministic::Runner::default();
576        executor.start(|context| async move {
577            let cfg = Config {
578                partition: "test_ordinal".into(),
579                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
580                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
581                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
582            };
583
584            // Create store with data
585            {
586                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
587                    .await
588                    .expect("Failed to initialize store");
589
590                store
591                    .put(0, FixedBytes::new([0u8; 32]))
592                    .await
593                    .expect("Failed to put data");
594                store
595                    .put(1000, FixedBytes::new([100u8; 32]))
596                    .await
597                    .expect("Failed to put data");
598
599                // Destroy the store
600                store.destroy().await.expect("Failed to destroy store");
601            }
602
603            // Try to create a new store - it should be empty
604            {
605                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
606                    .await
607                    .expect("Failed to initialize store");
608
609                // Should not find any data
610                assert!(store.get(0).await.unwrap().is_none());
611                assert!(store.get(1000).await.unwrap().is_none());
612                assert!(!store.has(0));
613                assert!(!store.has(1000));
614            }
615        });
616    }
617
618    #[test_traced]
619    fn test_partial_record_write() {
620        // Initialize the deterministic context
621        let executor = deterministic::Runner::default();
622        executor.start(|context| async move {
623            let cfg = Config {
624                partition: "test_ordinal".into(),
625                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
626                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
627                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
628            };
629
630            // Create store with data
631            {
632                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
633                    .await
634                    .expect("Failed to initialize store");
635
636                store
637                    .put(0, FixedBytes::new([42u8; 32]))
638                    .await
639                    .expect("Failed to put data");
640                store
641                    .put(1, FixedBytes::new([43u8; 32]))
642                    .await
643                    .expect("Failed to put data");
644                store.sync().await.expect("Failed to sync store");
645            }
646
647            // Corrupt by writing partial record (only value, no CRC)
648            {
649                let (blob, _) = context
650                    .open("test_ordinal", &0u64.to_be_bytes())
651                    .await
652                    .unwrap();
653                // Overwrite second record with partial data (32 bytes instead of 36)
654                blob.write_at(vec![0xFF; 32], 36).await.unwrap();
655                blob.sync().await.unwrap();
656            }
657
658            // Reopen and verify it handles partial write gracefully
659            {
660                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
661                    .await
662                    .expect("Failed to initialize store");
663
664                // First record should be fine
665                assert_eq!(
666                    store.get(0).await.unwrap().unwrap(),
667                    FixedBytes::new([42u8; 32])
668                );
669
670                // Second record should be removed due to partial write
671                assert!(!store.has(1));
672                assert!(store.get(1).await.unwrap().is_none());
673
674                // Store should still be functional
675                let mut store_mut = store;
676                store_mut.put(1, FixedBytes::new([44u8; 32])).await.unwrap();
677                assert_eq!(
678                    store_mut.get(1).await.unwrap().unwrap(),
679                    FixedBytes::new([44u8; 32])
680                );
681            }
682        });
683    }
684
685    #[test_traced]
686    fn test_corrupted_value() {
687        // Initialize the deterministic context
688        let executor = deterministic::Runner::default();
689        executor.start(|context| async move {
690            let cfg = Config {
691                partition: "test_ordinal".into(),
692                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
693                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
694                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
695            };
696
697            // Create store with data
698            {
699                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
700                    .await
701                    .expect("Failed to initialize store");
702
703                store
704                    .put(0, FixedBytes::new([42u8; 32]))
705                    .await
706                    .expect("Failed to put data");
707                store
708                    .put(1, FixedBytes::new([43u8; 32]))
709                    .await
710                    .expect("Failed to put data");
711                store.sync().await.expect("Failed to sync store");
712            }
713
714            // Corrupt the value portion of a record
715            {
716                let (blob, _) = context
717                    .open("test_ordinal", &0u64.to_be_bytes())
718                    .await
719                    .unwrap();
720                // Corrupt some bytes in the value of the first record
721                blob.write_at(hex!("0xFFFFFFFF").to_vec(), 10)
722                    .await
723                    .unwrap();
724                blob.sync().await.unwrap();
725            }
726
727            // Reopen and verify it detects corruption
728            {
729                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
730                    .await
731                    .expect("Failed to initialize store");
732
733                // First record should be detected as corrupted (CRC mismatch)
734                assert!(!store.has(0));
735
736                // Second record should still be valid
737                assert!(store.has(1));
738                assert_eq!(
739                    store.get(1).await.unwrap().unwrap(),
740                    FixedBytes::new([43u8; 32])
741                );
742            }
743        });
744    }
745
746    #[test_traced]
747    fn test_crc_corruptions() {
748        // Initialize the deterministic context
749        let executor = deterministic::Runner::default();
750        executor.start(|context| async move {
751            let cfg = Config {
752                partition: "test_ordinal".into(),
753                items_per_blob: NZU64!(10), // Small blob size for testing
754                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
755                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
756            };
757
758            // Create store with data across multiple blobs
759            {
760                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
761                    .await
762                    .expect("Failed to initialize store");
763
764                // Add values across 2 blobs
765                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
766                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
767                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
768                store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
769                store.sync().await.expect("Failed to sync store");
770            }
771
772            // Corrupt CRCs in different blobs
773            {
774                // Corrupt CRC in first blob
775                let (blob, _) = context
776                    .open("test_ordinal", &0u64.to_be_bytes())
777                    .await
778                    .unwrap();
779                blob.write_at(vec![0xFF], 32).await.unwrap(); // Corrupt CRC of index 0
780                blob.sync().await.unwrap();
781
782                // Corrupt value in second blob (which will invalidate CRC)
783                let (blob, _) = context
784                    .open("test_ordinal", &1u64.to_be_bytes())
785                    .await
786                    .unwrap();
787                blob.write_at(vec![0xFF; 4], 5).await.unwrap(); // Corrupt value of index 10
788                blob.sync().await.unwrap();
789            }
790
791            // Reopen and verify handling of CRC corruptions
792            {
793                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
794                    .await
795                    .expect("Failed to initialize store");
796
797                // Corrupted records should not be present
798                assert!(!store.has(0)); // CRC corrupted
799                assert!(!store.has(10)); // Value corrupted (CRC mismatch)
800
801                // Valid records should still be accessible
802                assert!(store.has(5));
803                assert!(store.has(15));
804                assert_eq!(
805                    store.get(5).await.unwrap().unwrap(),
806                    FixedBytes::new([5u8; 32])
807                );
808                assert_eq!(
809                    store.get(15).await.unwrap().unwrap(),
810                    FixedBytes::new([15u8; 32])
811                );
812            }
813        });
814    }
815
816    #[test_traced]
817    fn test_extra_bytes_in_blob() {
818        // Initialize the deterministic context
819        let executor = deterministic::Runner::default();
820        executor.start(|context| async move {
821            let cfg = Config {
822                partition: "test_ordinal".into(),
823                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
824                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
825                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
826            };
827
828            // Create store with data
829            {
830                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
831                    .await
832                    .expect("Failed to initialize store");
833
834                store
835                    .put(0, FixedBytes::new([42u8; 32]))
836                    .await
837                    .expect("Failed to put data");
838                store
839                    .put(1, FixedBytes::new([43u8; 32]))
840                    .await
841                    .expect("Failed to put data");
842                store.sync().await.expect("Failed to sync store");
843            }
844
845            // Add extra bytes at the end of blob
846            {
847                let (blob, size) = context
848                    .open("test_ordinal", &0u64.to_be_bytes())
849                    .await
850                    .unwrap();
851                // Add garbage data that forms a complete but invalid record
852                // This avoids partial record issues
853                let mut garbage = vec![0xFF; 32]; // Invalid value
854                let invalid_crc = 0xDEADBEEFu32;
855                garbage.extend_from_slice(&invalid_crc.to_be_bytes());
856                assert_eq!(garbage.len(), 36); // Full record size
857                blob.write_at(garbage, size).await.unwrap();
858                blob.sync().await.unwrap();
859            }
860
861            // Reopen and verify it handles extra bytes
862            {
863                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
864                    .await
865                    .expect("Failed to initialize store");
866
867                // Original records should still be valid
868                assert!(store.has(0));
869                assert!(store.has(1));
870                assert_eq!(
871                    store.get(0).await.unwrap().unwrap(),
872                    FixedBytes::new([42u8; 32])
873                );
874                assert_eq!(
875                    store.get(1).await.unwrap().unwrap(),
876                    FixedBytes::new([43u8; 32])
877                );
878
879                // Store should still be functional
880                let mut store_mut = store;
881                store_mut.put(2, FixedBytes::new([44u8; 32])).await.unwrap();
882                assert_eq!(
883                    store_mut.get(2).await.unwrap().unwrap(),
884                    FixedBytes::new([44u8; 32])
885                );
886            }
887        });
888    }
889
890    #[test_traced]
891    fn test_zero_filled_records() {
892        // Initialize the deterministic context
893        let executor = deterministic::Runner::default();
894        executor.start(|context| async move {
895            let cfg = Config {
896                partition: "test_ordinal".into(),
897                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
898                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
899                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
900            };
901
902            // Create blob with zero-filled space
903            {
904                let (blob, _) = context
905                    .open("test_ordinal", &0u64.to_be_bytes())
906                    .await
907                    .unwrap();
908
909                // Write zeros for several record positions
910                let zeros = vec![0u8; 36 * 5]; // 5 records worth of zeros
911                blob.write_at(zeros, 0).await.unwrap();
912
913                // Write a valid record after the zeros
914                let mut valid_record = vec![44u8; 32];
915                let crc = Crc32::checksum(&valid_record);
916                valid_record.extend_from_slice(&crc.to_be_bytes());
917                blob.write_at(valid_record, 36 * 5).await.unwrap();
918
919                blob.sync().await.unwrap();
920            }
921
922            // Initialize store and verify it handles zero-filled records
923            {
924                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
925                    .await
926                    .expect("Failed to initialize store");
927
928                // Zero-filled positions should not be considered valid
929                for i in 0..5 {
930                    assert!(!store.has(i));
931                }
932
933                // The valid record should be found
934                assert!(store.has(5));
935                assert_eq!(
936                    store.get(5).await.unwrap().unwrap(),
937                    FixedBytes::new([44u8; 32])
938                );
939            }
940        });
941    }
942
943    fn test_operations_and_restart(num_values: usize) -> String {
944        // Initialize the deterministic context
945        let executor = deterministic::Runner::default();
946        executor.start(|mut context| async move {
947            let cfg = Config {
948                partition: "test_ordinal".into(),
949                items_per_blob: NZU64!(100), // Smaller blobs to test multiple blob handling
950                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
951                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
952            };
953
954            // Initialize the store
955            let mut store = Ordinal::<_, FixedBytes<128>>::init(context.clone(), cfg.clone())
956                .await
957                .expect("Failed to initialize store");
958
959            // Generate and insert random values at various indices
960            let mut values = Vec::new();
961            let mut rng_index = 0u64;
962
963            for _ in 0..num_values {
964                // Generate a pseudo-random index (sparse to test gaps)
965                let mut index_bytes = [0u8; 8];
966                context.fill_bytes(&mut index_bytes);
967                let index_offset = u64::from_be_bytes(index_bytes) % 1000;
968                let index = rng_index + index_offset;
969                rng_index = index + 1;
970
971                // Generate random value
972                let mut value = [0u8; 128];
973                context.fill_bytes(&mut value);
974                let value = FixedBytes::<128>::new(value);
975
976                store
977                    .put(index, value.clone())
978                    .await
979                    .expect("Failed to put data");
980                values.push((index, value));
981            }
982
983            // Sync data
984            store.sync().await.expect("Failed to sync");
985
986            // Verify all values can be retrieved
987            for (index, value) in &values {
988                let retrieved = store
989                    .get(*index)
990                    .await
991                    .expect("Failed to get data")
992                    .expect("Data not found");
993                assert_eq!(&retrieved, value);
994            }
995
996            // Test next_gap on various indices
997            for i in 0..10 {
998                let _ = store.next_gap(i * 100);
999            }
1000
1001            // Sync and drop the store
1002            store.sync().await.expect("Failed to sync store");
1003            drop(store);
1004
1005            // Reopen the store
1006            let mut store = Ordinal::<_, FixedBytes<128>>::init(context.clone(), cfg)
1007                .await
1008                .expect("Failed to initialize store");
1009
1010            // Verify all values are still there after restart
1011            for (index, value) in &values {
1012                let retrieved = store
1013                    .get(*index)
1014                    .await
1015                    .expect("Failed to get data")
1016                    .expect("Data not found");
1017                assert_eq!(&retrieved, value);
1018            }
1019
1020            // Add more values after restart
1021            for _ in 0..10 {
1022                let mut index_bytes = [0u8; 8];
1023                context.fill_bytes(&mut index_bytes);
1024                let index = u64::from_be_bytes(index_bytes) % 10000;
1025
1026                let mut value = [0u8; 128];
1027                context.fill_bytes(&mut value);
1028                let value = FixedBytes::<128>::new(value);
1029
1030                store.put(index, value).await.expect("Failed to put data");
1031            }
1032
1033            // Final sync
1034            store.sync().await.expect("Failed to sync");
1035
1036            // Return the auditor state for comparison
1037            context.auditor().state()
1038        })
1039    }
1040
1041    #[test_group("slow")]
1042    #[test_traced]
1043    fn test_determinism() {
1044        let state1 = test_operations_and_restart(100);
1045        let state2 = test_operations_and_restart(100);
1046        assert_eq!(state1, state2);
1047    }
1048
1049    #[test_traced]
1050    fn test_prune_basic() {
1051        // Initialize the deterministic context
1052        let executor = deterministic::Runner::default();
1053        executor.start(|context| async move {
1054            let cfg = Config {
1055                partition: "test_ordinal".into(),
1056                items_per_blob: NZU64!(100), // Small blobs to test multiple blob handling
1057                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1058                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1059            };
1060
1061            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1062                .await
1063                .expect("Failed to initialize store");
1064
1065            // Insert data across multiple blobs
1066            let values = vec![
1067                (0u64, FixedBytes::new([0u8; 32])),     // Blob 0
1068                (50u64, FixedBytes::new([50u8; 32])),   // Blob 0
1069                (100u64, FixedBytes::new([100u8; 32])), // Blob 1
1070                (150u64, FixedBytes::new([150u8; 32])), // Blob 1
1071                (200u64, FixedBytes::new([200u8; 32])), // Blob 2
1072                (300u64, FixedBytes::new([44u8; 32])),  // Blob 3
1073            ];
1074
1075            for (index, value) in &values {
1076                store
1077                    .put(*index, value.clone())
1078                    .await
1079                    .expect("Failed to put data");
1080            }
1081            store.sync().await.unwrap();
1082
1083            // Verify all values exist
1084            for (index, value) in &values {
1085                assert_eq!(store.get(*index).await.unwrap().unwrap(), *value);
1086            }
1087
1088            // Prune up to index 150 (should remove blob 0 only)
1089            store.prune(150).await.unwrap();
1090            let buffer = context.encode();
1091            assert!(buffer.contains("pruned_total 1"));
1092
1093            // Verify pruned data is gone
1094            assert!(!store.has(0));
1095            assert!(!store.has(50));
1096            assert!(store.get(0).await.unwrap().is_none());
1097            assert!(store.get(50).await.unwrap().is_none());
1098
1099            // Verify remaining data is still there
1100            assert!(store.has(100));
1101            assert!(store.has(150));
1102            assert!(store.has(200));
1103            assert!(store.has(300));
1104            assert_eq!(store.get(100).await.unwrap().unwrap(), values[2].1);
1105            assert_eq!(store.get(150).await.unwrap().unwrap(), values[3].1);
1106            assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1107            assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1108
1109            // Prune more aggressively - up to index 250 (should remove blob 1)
1110            store.prune(250).await.unwrap();
1111            let buffer = context.encode();
1112            assert!(buffer.contains("pruned_total 2"));
1113
1114            // Verify more data is pruned
1115            assert!(!store.has(100));
1116            assert!(!store.has(150));
1117            assert!(store.get(100).await.unwrap().is_none());
1118            assert!(store.get(150).await.unwrap().is_none());
1119
1120            // Verify remaining data
1121            assert!(store.has(200));
1122            assert!(store.has(300));
1123            assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1124            assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1125        });
1126    }
1127
1128    #[test_traced]
1129    fn test_prune_with_gaps() {
1130        // Initialize the deterministic context
1131        let executor = deterministic::Runner::default();
1132        executor.start(|context| async move {
1133            let cfg = Config {
1134                partition: "test_ordinal".into(),
1135                items_per_blob: NZU64!(100),
1136                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1137                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1138            };
1139
1140            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1141                .await
1142                .expect("Failed to initialize store");
1143
1144            // Insert sparse data with gaps
1145            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1146            store.put(105, FixedBytes::new([105u8; 32])).await.unwrap();
1147            store.put(305, FixedBytes::new([49u8; 32])).await.unwrap();
1148            store.sync().await.unwrap();
1149
1150            // Check gaps before pruning
1151            let (current_end, next_start) = store.next_gap(0);
1152            assert!(current_end.is_none());
1153            assert_eq!(next_start, Some(5));
1154
1155            let (current_end, next_start) = store.next_gap(5);
1156            assert_eq!(current_end, Some(5));
1157            assert_eq!(next_start, Some(105));
1158
1159            // Prune up to index 150 (should remove blob 0)
1160            store.prune(150).await.unwrap();
1161
1162            // Verify pruned data is gone
1163            assert!(!store.has(5));
1164            assert!(store.get(5).await.unwrap().is_none());
1165
1166            // Verify remaining data and gaps
1167            assert!(store.has(105));
1168            assert!(store.has(305));
1169
1170            let (current_end, next_start) = store.next_gap(0);
1171            assert!(current_end.is_none());
1172            assert_eq!(next_start, Some(105));
1173
1174            let (current_end, next_start) = store.next_gap(105);
1175            assert_eq!(current_end, Some(105));
1176            assert_eq!(next_start, Some(305));
1177        });
1178    }
1179
1180    #[test_traced]
1181    fn test_prune_no_op() {
1182        // Initialize the deterministic context
1183        let executor = deterministic::Runner::default();
1184        executor.start(|context| async move {
1185            let cfg = Config {
1186                partition: "test_ordinal".into(),
1187                items_per_blob: NZU64!(100),
1188                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1189                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1190            };
1191
1192            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1193                .await
1194                .expect("Failed to initialize store");
1195
1196            // Insert data
1197            store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1198            store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1199            store.sync().await.unwrap();
1200
1201            // Try to prune before any data - should be no-op
1202            store.prune(50).await.unwrap();
1203
1204            // Verify no data was actually pruned
1205            assert!(store.has(100));
1206            assert!(store.has(200));
1207            let buffer = context.encode();
1208            assert!(buffer.contains("pruned_total 0"));
1209
1210            // Try to prune exactly at blob boundary - should be no-op
1211            store.prune(100).await.unwrap();
1212
1213            // Verify still no data pruned
1214            assert!(store.has(100));
1215            assert!(store.has(200));
1216            let buffer = context.encode();
1217            assert!(buffer.contains("pruned_total 0"));
1218        });
1219    }
1220
1221    #[test_traced]
1222    fn test_prune_empty_store() {
1223        // Initialize the deterministic context
1224        let executor = deterministic::Runner::default();
1225        executor.start(|context| async move {
1226            let cfg = Config {
1227                partition: "test_ordinal".into(),
1228                items_per_blob: NZU64!(100),
1229                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1230                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1231            };
1232
1233            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1234                .await
1235                .expect("Failed to initialize store");
1236
1237            // Try to prune empty store
1238            store.prune(1000).await.unwrap();
1239
1240            // Store should still be functional
1241            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1242            assert!(store.has(0));
1243        });
1244    }
1245
1246    #[test_traced]
1247    fn test_prune_after_restart() {
1248        // Initialize the deterministic context
1249        let executor = deterministic::Runner::default();
1250        executor.start(|context| async move {
1251            let cfg = Config {
1252                partition: "test_ordinal".into(),
1253                items_per_blob: NZU64!(100),
1254                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1255                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1256            };
1257
1258            // Create store and add data
1259            {
1260                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1261                    .await
1262                    .expect("Failed to initialize store");
1263
1264                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1265                store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1266                store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1267                store.sync().await.unwrap();
1268            }
1269
1270            // Reopen and prune
1271            {
1272                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1273                    .await
1274                    .expect("Failed to initialize store");
1275
1276                // Verify data is there
1277                assert!(store.has(0));
1278                assert!(store.has(100));
1279                assert!(store.has(200));
1280
1281                // Prune up to index 150
1282                store.prune(150).await.unwrap();
1283
1284                // Verify pruning worked
1285                assert!(!store.has(0));
1286                assert!(store.has(100));
1287                assert!(store.has(200));
1288
1289                store.sync().await.unwrap();
1290            }
1291
1292            // Reopen again and verify pruning persisted
1293            {
1294                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1295                    .await
1296                    .expect("Failed to initialize store");
1297
1298                assert!(!store.has(0));
1299                assert!(store.has(100));
1300                assert!(store.has(200));
1301
1302                // Check gaps
1303                let (current_end, next_start) = store.next_gap(0);
1304                assert!(current_end.is_none());
1305                assert_eq!(next_start, Some(100));
1306            }
1307        });
1308    }
1309
1310    #[test_traced]
1311    fn test_prune_multiple_operations() {
1312        // Initialize the deterministic context
1313        let executor = deterministic::Runner::default();
1314        executor.start(|context| async move {
1315            let cfg = Config {
1316                partition: "test_ordinal".into(),
1317                items_per_blob: NZU64!(50), // Smaller blobs for more granular testing
1318                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1319                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1320            };
1321
1322            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1323                .await
1324                .expect("Failed to initialize store");
1325
1326            // Insert data across many blobs
1327            let mut values = Vec::new();
1328            for i in 0..10 {
1329                let index = i * 50 + 25; // Middle of each blob
1330                let value = FixedBytes::new([i as u8; 32]);
1331                store.put(index, value.clone()).await.unwrap();
1332                values.push((index, value));
1333            }
1334            store.sync().await.unwrap();
1335
1336            // Prune incrementally
1337            for i in 1..5 {
1338                let prune_index = i * 50 + 10;
1339                store.prune(prune_index).await.unwrap();
1340
1341                // Verify appropriate data is pruned
1342                for (index, _) in &values {
1343                    if *index < prune_index {
1344                        assert!(!store.has(*index), "Index {index} should be pruned");
1345                    } else {
1346                        assert!(store.has(*index), "Index {index} should not be pruned");
1347                    }
1348                }
1349            }
1350
1351            // Check final state
1352            let buffer = context.encode();
1353            assert!(buffer.contains("pruned_total 4"));
1354
1355            // Verify remaining data
1356            for i in 4..10 {
1357                let index = i * 50 + 25;
1358                assert!(store.has(index));
1359                assert_eq!(
1360                    store.get(index).await.unwrap().unwrap(),
1361                    values[i as usize].1
1362                );
1363            }
1364        });
1365    }
1366
1367    #[test_traced]
1368    fn test_prune_blob_boundaries() {
1369        // Initialize the deterministic context
1370        let executor = deterministic::Runner::default();
1371        executor.start(|context| async move {
1372            let cfg = Config {
1373                partition: "test_ordinal".into(),
1374                items_per_blob: NZU64!(100),
1375                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1376                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1377            };
1378
1379            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1380                .await
1381                .expect("Failed to initialize store");
1382
1383            // Insert data at blob boundaries
1384            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); // Start of blob 0
1385            store.put(99, FixedBytes::new([99u8; 32])).await.unwrap(); // End of blob 0
1386            store.put(100, FixedBytes::new([100u8; 32])).await.unwrap(); // Start of blob 1
1387            store.put(199, FixedBytes::new([199u8; 32])).await.unwrap(); // End of blob 1
1388            store.put(200, FixedBytes::new([200u8; 32])).await.unwrap(); // Start of blob 2
1389            store.sync().await.unwrap();
1390
1391            // Test various pruning points around boundaries
1392
1393            // Prune exactly at blob boundary (100) - should prune blob 0
1394            store.prune(100).await.unwrap();
1395            assert!(!store.has(0));
1396            assert!(!store.has(99));
1397            assert!(store.has(100));
1398            assert!(store.has(199));
1399            assert!(store.has(200));
1400
1401            // Prune just before next boundary (199) - should not prune blob 1
1402            store.prune(199).await.unwrap();
1403            assert!(store.has(100));
1404            assert!(store.has(199));
1405            assert!(store.has(200));
1406
1407            // Prune exactly at next boundary (200) - should prune blob 1
1408            store.prune(200).await.unwrap();
1409            assert!(!store.has(100));
1410            assert!(!store.has(199));
1411            assert!(store.has(200));
1412
1413            let buffer = context.encode();
1414            assert!(buffer.contains("pruned_total 2"));
1415        });
1416    }
1417
1418    #[test_traced]
1419    fn test_prune_non_contiguous_sections() {
1420        // Initialize the deterministic context
1421        let executor = deterministic::Runner::default();
1422        executor.start(|context| async move {
1423            let cfg = Config {
1424                partition: "test_ordinal".into(),
1425                items_per_blob: NZU64!(100),
1426                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1427                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1428            };
1429
1430            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1431                .await
1432                .expect("Failed to initialize store");
1433
1434            // Insert data in non-contiguous sections (0, 2, 5, 7)
1435            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); // Section 0
1436            store.put(250, FixedBytes::new([50u8; 32])).await.unwrap(); // Section 2 (250/100 = 2)
1437            store.put(500, FixedBytes::new([44u8; 32])).await.unwrap(); // Section 5 (500/100 = 5)
1438            store.put(750, FixedBytes::new([45u8; 32])).await.unwrap(); // Section 7 (750/100 = 7)
1439            store.sync().await.unwrap();
1440
1441            // Verify all data exists initially
1442            assert!(store.has(0));
1443            assert!(store.has(250));
1444            assert!(store.has(500));
1445            assert!(store.has(750));
1446
1447            // Prune up to section 3 (index 300) - should remove sections 0 and 2
1448            store.prune(300).await.unwrap();
1449
1450            // Verify correct data was pruned
1451            assert!(!store.has(0)); // Section 0 pruned
1452            assert!(!store.has(250)); // Section 2 pruned
1453            assert!(store.has(500)); // Section 5 remains
1454            assert!(store.has(750)); // Section 7 remains
1455
1456            let buffer = context.encode();
1457            assert!(buffer.contains("pruned_total 2"));
1458
1459            // Prune up to section 6 (index 600) - should remove section 5
1460            store.prune(600).await.unwrap();
1461
1462            // Verify section 5 was pruned
1463            assert!(!store.has(500)); // Section 5 pruned
1464            assert!(store.has(750)); // Section 7 remains
1465
1466            let buffer = context.encode();
1467            assert!(buffer.contains("pruned_total 3"));
1468
1469            // Prune everything - should remove section 7
1470            store.prune(1000).await.unwrap();
1471
1472            // Verify all data is gone
1473            assert!(!store.has(750)); // Section 7 pruned
1474
1475            let buffer = context.encode();
1476            assert!(buffer.contains("pruned_total 4"));
1477        });
1478    }
1479
1480    #[test_traced]
1481    fn test_prune_removes_correct_pending() {
1482        // Initialize the deterministic context
1483        let executor = deterministic::Runner::default();
1484        executor.start(|context| async move {
1485            let cfg = Config {
1486                partition: "test_ordinal".into(),
1487                items_per_blob: NZU64!(100),
1488                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1489                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1490            };
1491            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1492                .await
1493                .expect("Failed to initialize store");
1494
1495            // Insert and sync some data in blob 0
1496            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1497            store.sync().await.unwrap();
1498
1499            // Add pending entries to blob 0 and blob 1
1500            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap(); // blob 0
1501            store.put(110, FixedBytes::new([110u8; 32])).await.unwrap(); // blob 1
1502
1503            // Verify all data is visible before pruning
1504            assert!(store.has(5));
1505            assert!(store.has(10));
1506            assert!(store.has(110));
1507
1508            // Prune up to index 100, which should remove blob 0 (indices 0-99).
1509            store.prune(150).await.unwrap();
1510
1511            // Verify that synced and pending entries in blob 0 are removed.
1512            assert!(!store.has(5));
1513            assert!(!store.has(10));
1514
1515            // Verify that the pending entry in blob 1 remains.
1516            assert!(store.has(110));
1517            assert_eq!(
1518                store.get(110).await.unwrap().unwrap(),
1519                FixedBytes::new([110u8; 32])
1520            );
1521
1522            // Sync the remaining pending entry and verify it's still there.
1523            store.sync().await.unwrap();
1524            assert!(store.has(110));
1525            assert_eq!(
1526                store.get(110).await.unwrap().unwrap(),
1527                FixedBytes::new([110u8; 32])
1528            );
1529        });
1530    }
1531
1532    #[test_traced]
1533    fn test_init_with_bits_none() {
1534        // Initialize the deterministic context
1535        let executor = deterministic::Runner::default();
1536        executor.start(|context| async move {
1537            let cfg = Config {
1538                partition: "test_ordinal".into(),
1539                items_per_blob: NZU64!(10), // Small blob size for testing
1540                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1541                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1542            };
1543
1544            // Create store with data across multiple sections
1545            {
1546                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1547                    .await
1548                    .expect("Failed to initialize store");
1549
1550                // Section 0 (indices 0-9)
1551                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1552                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1553                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1554
1555                // Section 1 (indices 10-19)
1556                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1557                store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
1558
1559                // Section 2 (indices 20-29)
1560                store.put(25, FixedBytes::new([25u8; 32])).await.unwrap();
1561
1562                store.sync().await.unwrap();
1563            }
1564
1565            // Reinitialize with bits = None (should behave like regular init)
1566            {
1567                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1568                    context.clone(),
1569                    cfg.clone(),
1570                    None,
1571                )
1572                .await
1573                .expect("Failed to initialize store with bits");
1574
1575                // All records should be available
1576                assert!(store.has(0));
1577                assert!(store.has(5));
1578                assert!(store.has(9));
1579                assert!(store.has(10));
1580                assert!(store.has(15));
1581                assert!(store.has(25));
1582
1583                // Non-existent records should not be available
1584                assert!(!store.has(1));
1585                assert!(!store.has(11));
1586                assert!(!store.has(20));
1587
1588                // Verify values
1589                assert_eq!(
1590                    store.get(0).await.unwrap().unwrap(),
1591                    FixedBytes::new([0u8; 32])
1592                );
1593                assert_eq!(
1594                    store.get(15).await.unwrap().unwrap(),
1595                    FixedBytes::new([15u8; 32])
1596                );
1597            }
1598        });
1599    }
1600
1601    #[test_traced]
1602    fn test_init_with_bits_empty_hashmap() {
1603        // Initialize the deterministic context
1604        let executor = deterministic::Runner::default();
1605        executor.start(|context| async move {
1606            let cfg = Config {
1607                partition: "test_ordinal".into(),
1608                items_per_blob: NZU64!(10),
1609                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1610                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1611            };
1612
1613            // Create store with data
1614            {
1615                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1616                    .await
1617                    .expect("Failed to initialize store");
1618
1619                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1620                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1621                store.put(20, FixedBytes::new([20u8; 32])).await.unwrap();
1622
1623                store.sync().await.unwrap();
1624            }
1625
1626            // Reinitialize with empty HashMap - should skip all sections
1627            {
1628                let bits: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1629                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1630                    context.clone(),
1631                    cfg.clone(),
1632                    Some(bits),
1633                )
1634                .await
1635                .expect("Failed to initialize store with bits");
1636
1637                // No records should be available since no sections were in the bits map
1638                assert!(!store.has(0));
1639                assert!(!store.has(10));
1640                assert!(!store.has(20));
1641            }
1642        });
1643    }
1644
1645    #[test_traced]
1646    fn test_init_with_bits_selective_sections() {
1647        // Initialize the deterministic context
1648        let executor = deterministic::Runner::default();
1649        executor.start(|context| async move {
1650            let cfg = Config {
1651                partition: "test_ordinal".into(),
1652                items_per_blob: NZU64!(10),
1653                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1654                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1655            };
1656
1657            // Create store with data in multiple sections
1658            {
1659                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1660                    .await
1661                    .expect("Failed to initialize store");
1662
1663                // Section 0 (indices 0-9)
1664                for i in 0..10 {
1665                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1666                }
1667
1668                // Section 1 (indices 10-19)
1669                for i in 10..20 {
1670                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1671                }
1672
1673                // Section 2 (indices 20-29)
1674                for i in 20..30 {
1675                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1676                }
1677
1678                store.sync().await.unwrap();
1679            }
1680
1681            // Reinitialize with bits for only section 1
1682            {
1683                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1684
1685                // Create a BitMap that marks indices 12, 15, and 18 as present
1686                let mut bitmap = BitMap::zeroes(10);
1687                bitmap.set(2, true); // Index 12 (offset 2 in section 1)
1688                bitmap.set(5, true); // Index 15 (offset 5 in section 1)
1689                bitmap.set(8, true); // Index 18 (offset 8 in section 1)
1690                let bitmap_option = Some(bitmap);
1691
1692                bits_map.insert(1, &bitmap_option);
1693
1694                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1695                    context.clone(),
1696                    cfg.clone(),
1697                    Some(bits_map),
1698                )
1699                .await
1700                .expect("Failed to initialize store with bits");
1701
1702                // Only specified indices from section 1 should be available
1703                assert!(store.has(12));
1704                assert!(store.has(15));
1705                assert!(store.has(18));
1706
1707                // Other indices from section 1 should not be available
1708                assert!(!store.has(10));
1709                assert!(!store.has(11));
1710                assert!(!store.has(13));
1711                assert!(!store.has(14));
1712                assert!(!store.has(16));
1713                assert!(!store.has(17));
1714                assert!(!store.has(19));
1715
1716                // All indices from sections 0 and 2 should not be available
1717                for i in 0..10 {
1718                    assert!(!store.has(i));
1719                }
1720                for i in 20..30 {
1721                    assert!(!store.has(i));
1722                }
1723
1724                // Verify the available values
1725                assert_eq!(
1726                    store.get(12).await.unwrap().unwrap(),
1727                    FixedBytes::new([12u8; 32])
1728                );
1729                assert_eq!(
1730                    store.get(15).await.unwrap().unwrap(),
1731                    FixedBytes::new([15u8; 32])
1732                );
1733                assert_eq!(
1734                    store.get(18).await.unwrap().unwrap(),
1735                    FixedBytes::new([18u8; 32])
1736                );
1737            }
1738        });
1739    }
1740
1741    #[test_traced]
1742    fn test_init_with_bits_none_option_all_records_exist() {
1743        // Initialize the deterministic context
1744        let executor = deterministic::Runner::default();
1745        executor.start(|context| async move {
1746            let cfg = Config {
1747                partition: "test_ordinal".into(),
1748                items_per_blob: NZU64!(5),
1749                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1750                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1751            };
1752
1753            // Create store with all records in a section
1754            {
1755                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1756                    .await
1757                    .expect("Failed to initialize store");
1758
1759                // Fill section 1 completely (indices 5-9)
1760                for i in 5..10 {
1761                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1762                }
1763
1764                store.sync().await.unwrap();
1765            }
1766
1767            // Reinitialize with None option for section 1 (expects all records)
1768            {
1769                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1770                let none_option: Option<BitMap> = None;
1771                bits_map.insert(1, &none_option);
1772
1773                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1774                    context.clone(),
1775                    cfg.clone(),
1776                    Some(bits_map),
1777                )
1778                .await
1779                .expect("Failed to initialize store with bits");
1780
1781                // All records in section 1 should be available
1782                for i in 5..10 {
1783                    assert!(store.has(i));
1784                    assert_eq!(
1785                        store.get(i).await.unwrap().unwrap(),
1786                        FixedBytes::new([i as u8; 32])
1787                    );
1788                }
1789            }
1790        });
1791    }
1792
1793    #[test_traced]
1794    #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(6)")]
1795    fn test_init_with_bits_none_option_missing_record_panics() {
1796        // Initialize the deterministic context
1797        let executor = deterministic::Runner::default();
1798        executor.start(|context| async move {
1799            let cfg = Config {
1800                partition: "test_ordinal".into(),
1801                items_per_blob: NZU64!(5),
1802                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1803                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1804            };
1805
1806            // Create store with missing record in a section
1807            {
1808                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1809                    .await
1810                    .expect("Failed to initialize store");
1811
1812                // Fill section 1 partially (skip index 6)
1813                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1814                // Skip index 6
1815                store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1816                store.put(8, FixedBytes::new([8u8; 32])).await.unwrap();
1817                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1818
1819                store.sync().await.unwrap();
1820            }
1821
1822            // Reinitialize with None option for section 1 (expects all records)
1823            // This should panic because index 6 is missing
1824            {
1825                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1826                let none_option: Option<BitMap> = None;
1827                bits_map.insert(1, &none_option);
1828
1829                let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1830                    context.clone(),
1831                    cfg.clone(),
1832                    Some(bits_map),
1833                )
1834                .await
1835                .expect("Failed to initialize store with bits");
1836            }
1837        });
1838    }
1839
1840    #[test_traced]
1841    fn test_init_with_bits_mixed_sections() {
1842        // Initialize the deterministic context
1843        let executor = deterministic::Runner::default();
1844        executor.start(|context| async move {
1845            let cfg = Config {
1846                partition: "test_ordinal".into(),
1847                items_per_blob: NZU64!(5),
1848                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1849                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1850            };
1851
1852            // Create store with data in multiple sections
1853            {
1854                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1855                    .await
1856                    .expect("Failed to initialize store");
1857
1858                // Section 0: indices 0-4 (fill completely)
1859                for i in 0..5 {
1860                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1861                }
1862
1863                // Section 1: indices 5-9 (fill partially)
1864                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1865                store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1866                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1867
1868                // Section 2: indices 10-14 (fill completely)
1869                for i in 10..15 {
1870                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1871                }
1872
1873                store.sync().await.unwrap();
1874            }
1875
1876            // Reinitialize with mixed bits configuration
1877            {
1878                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1879
1880                // Section 0: None option (expects all records)
1881                let none_option: Option<BitMap> = None;
1882                bits_map.insert(0, &none_option);
1883
1884                // Section 1: BitMap with specific indices
1885                let mut bitmap1 = BitMap::zeroes(5);
1886                bitmap1.set(0, true); // Index 5
1887                bitmap1.set(2, true); // Index 7
1888                                      // Note: not setting bit for index 9, so it should be ignored
1889                let bitmap1_option = Some(bitmap1);
1890                bits_map.insert(1, &bitmap1_option);
1891
1892                // Section 2: Not in map, should be skipped entirely
1893
1894                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1895                    context.clone(),
1896                    cfg.clone(),
1897                    Some(bits_map),
1898                )
1899                .await
1900                .expect("Failed to initialize store with bits");
1901
1902                // All records from section 0 should be available
1903                for i in 0..5 {
1904                    assert!(store.has(i));
1905                    assert_eq!(
1906                        store.get(i).await.unwrap().unwrap(),
1907                        FixedBytes::new([i as u8; 32])
1908                    );
1909                }
1910
1911                // Only specified records from section 1 should be available
1912                assert!(store.has(5));
1913                assert!(store.has(7));
1914                assert!(!store.has(6));
1915                assert!(!store.has(8));
1916                assert!(!store.has(9)); // Not set in bitmap
1917
1918                // No records from section 2 should be available
1919                for i in 10..15 {
1920                    assert!(!store.has(i));
1921                }
1922            }
1923        });
1924    }
1925
1926    #[test_traced]
1927    #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(2)")]
1928    fn test_init_with_bits_corrupted_records() {
1929        // Initialize the deterministic context
1930        let executor = deterministic::Runner::default();
1931        executor.start(|context| async move {
1932            let cfg = Config {
1933                partition: "test_ordinal".into(),
1934                items_per_blob: NZU64!(5),
1935                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1936                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1937            };
1938
1939            // Create store with data and corrupt one record
1940            {
1941                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1942                    .await
1943                    .expect("Failed to initialize store");
1944
1945                // Section 0: indices 0-4
1946                for i in 0..5 {
1947                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1948                }
1949
1950                store.sync().await.unwrap();
1951            }
1952
1953            // Corrupt record at index 2
1954            {
1955                let (blob, _) = context
1956                    .open("test_ordinal", &0u64.to_be_bytes())
1957                    .await
1958                    .unwrap();
1959                // Corrupt the CRC of record at index 2
1960                let offset = 2 * 36 + 32; // 2 * record_size + value_size
1961                blob.write_at(vec![0xFF], offset).await.unwrap();
1962                blob.sync().await.unwrap();
1963            }
1964
1965            // Reinitialize with bits that include the corrupted record
1966            {
1967                let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1968
1969                // Create a BitMap that includes the corrupted record
1970                let mut bitmap = BitMap::zeroes(5);
1971                bitmap.set(0, true); // Index 0
1972                bitmap.set(2, true); // Index 2 (corrupted) - this will cause a panic
1973                bitmap.set(4, true); // Index 4
1974                let bitmap_option = Some(bitmap);
1975                bits_map.insert(0, &bitmap_option);
1976
1977                let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1978                    context.clone(),
1979                    cfg.clone(),
1980                    Some(bits_map),
1981                )
1982                .await
1983                .expect("Failed to initialize store with bits");
1984            }
1985        });
1986    }
1987
1988    /// A dummy value that will fail parsing if the value is 0.
1989    #[derive(Debug, PartialEq, Eq)]
1990    pub struct DummyValue {
1991        pub value: u64,
1992    }
1993
1994    impl Write for DummyValue {
1995        fn write(&self, buf: &mut impl BufMut) {
1996            self.value.write(buf);
1997        }
1998    }
1999
2000    impl Read for DummyValue {
2001        type Cfg = ();
2002
2003        fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
2004            let value = u64::read(buf)?;
2005            if value == 0 {
2006                return Err(commonware_codec::Error::Invalid(
2007                    "DummyValue",
2008                    "value must be non-zero",
2009                ));
2010            }
2011            Ok(Self { value })
2012        }
2013    }
2014
2015    impl FixedSize for DummyValue {
2016        const SIZE: usize = u64::SIZE;
2017    }
2018
2019    #[test_traced]
2020    fn test_init_skip_unparseable_record() {
2021        // Initialize the deterministic context
2022        let executor = deterministic::Runner::default();
2023        executor.start(|context| async move {
2024            let cfg = Config {
2025                partition: "test_ordinal".into(),
2026                items_per_blob: NZU64!(1),
2027                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
2028                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
2029            };
2030
2031            // Create store with valid records
2032            {
2033                let mut store = Ordinal::<_, DummyValue>::init(context.clone(), cfg.clone())
2034                    .await
2035                    .expect("Failed to initialize store");
2036
2037                // Add records at indices 1, 2, 4
2038                store.put(1, DummyValue { value: 1 }).await.unwrap();
2039                store.put(2, DummyValue { value: 0 }).await.unwrap(); // will fail parsing
2040                store.put(4, DummyValue { value: 4 }).await.unwrap();
2041
2042                store.sync().await.unwrap();
2043            }
2044
2045            // Reinitialize - should skip the unparseable record but continue processing
2046            {
2047                let store = Ordinal::<_, DummyValue>::init(context.clone(), cfg.clone())
2048                    .await
2049                    .expect("Failed to initialize store");
2050
2051                // Record 0 should be available
2052                assert!(store.has(1), "Record 1 should be available");
2053                assert_eq!(
2054                    store.get(1).await.unwrap().unwrap(),
2055                    DummyValue { value: 1 },
2056                    "Record 0 should have correct value"
2057                );
2058
2059                // Record 2 should NOT be available (unparseable)
2060                assert!(
2061                    !store.has(2),
2062                    "Record 2 should not be available (unparseable)"
2063                );
2064
2065                // This tests that we didn't exit early when encountering the unparseable record
2066                assert!(
2067                    store.has(4),
2068                    "Record 4 should be available - we should not exit early on unparseable record"
2069                );
2070                assert_eq!(
2071                    store.get(4).await.unwrap().unwrap(),
2072                    DummyValue { value: 4 },
2073                    "Record 4 should have correct value"
2074                );
2075            }
2076        });
2077    }
2078}