commonware_storage/ordinal/
mod.rs

1//! A persistent index that maps sparse indices to [commonware_utils::Array]s.
2//!
3//! [Ordinal] is a collection of [commonware_runtime::Blob]s containing ordered records of fixed size.
4//! Because records are fixed size, file position corresponds directly to index. Unlike
5//! [crate::journal::fixed::Journal], [Ordinal] supports out-of-order insertion.
6//!
7//! # Design
8//!
9//! [Ordinal] is a collection of [commonware_runtime::Blob]s where:
10//! - Each record: `[V][crc32(V)]` where V is an [commonware_utils::Array]
11//! - Index N is at file offset: `N * RECORD_SIZE`
12//! - A [crate::rmap::RMap] tracks which indices have been written (and which are missing)
13//!
14//! # File Organization
15//!
16//! Records are grouped into blobs to avoid having too many files:
17//!
18//! ```text
19//! Blob 0: indices 0-999
20//! Blob 1: indices 1000-1999
21//! ...
22//! ```
23//!
24//! # Format
25//!
26//! [Ordinal] stores values in the following format:
27//!
28//! ```text
29//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
30//! | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |
31//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
32//! |          Value (Fixed Size)       |     CRC32     |
33//! +---+---+---+---+---+---+---+---+---+---+---+---+---+
34//! ```
35//!
36//! # Performance Characteristics
37//!
38//! - **Writes**: O(1) - direct offset calculation
39//! - **Reads**: O(1) - direct offset calculation
40//! - **Has**: O(1) - in-memory lookup (via [crate::rmap::RMap])
41//! - **Next Gap**: O(log n) - in-memory range query (via [crate::rmap::RMap])
42//! - **Restart**: O(n) where n is the number of existing records (to rebuild [crate::rmap::RMap])
43//!
44//! # Atomicity
45//!
46//! [Ordinal] eagerly writes all new data to [commonware_runtime::Blob]s. New data, however, is not
47//! synced until [Ordinal::sync] is called. As a result, data is not guaranteed to be atomically
48//! persisted (i.e. shutdown before [Ordinal::sync] may lead to some writes being lost).
49//!
50//! _If you want atomicity for sparse writes, pair [commonware_utils::BitVec] and
51//! [crate::metadata::Metadata] with [Ordinal] (use bits to indicate which items have been atomically
52//! written)._
53//!
54//! # Recovery
55//!
56//! On restart, [Ordinal] validates all records using their CRC32 and rebuilds the in-memory
57//! [crate::rmap::RMap]. Invalid records (corrupted or empty) are excluded from the rebuilt index.
58//!
59//! # Example
60//!
61//! ```rust
62//! use commonware_runtime::{Spawner, Runner, deterministic};
63//! use commonware_storage::ordinal::{Ordinal, Config};
64//! use commonware_utils::{sequence::FixedBytes, NZUsize, NZU64};
65//!
66//! let executor = deterministic::Runner::default();
67//! executor.start(|context| async move {
68//!     // Create a store for 32-byte values
69//!     let cfg = Config {
70//!         partition: "ordinal_store".into(),
71//!         items_per_blob: NZU64!(10000),
72//!         write_buffer: NZUsize!(4096),
73//!         replay_buffer: NZUsize!(1024 * 1024),
74//!     };
75//!     let mut store = Ordinal::<_, FixedBytes<32>>::init(context, cfg).await.unwrap();
76//!
77//!     // Put values at specific indices
78//!     let value1 = FixedBytes::new([1u8; 32]);
79//!     let value2 = FixedBytes::new([2u8; 32]);
80//!     store.put(0, value1).await.unwrap();
81//!     store.put(5, value2).await.unwrap();
82//!
83//!     // Sync to disk
84//!     store.sync().await.unwrap();
85//!
86//!     // Check for gaps
87//!     let (current_end, next_start) = store.next_gap(0);
88//!     assert_eq!(current_end, Some(0));
89//!     assert_eq!(next_start, Some(5));
90//!
91//!     // Close the store
92//!     store.close().await.unwrap();
93//! });
94//! ```
95
96mod storage;
97
98use std::num::{NonZeroU64, NonZeroUsize};
99pub use storage::Ordinal;
100use thiserror::Error;
101
102/// Errors that can occur when interacting with the [Ordinal].
103#[derive(Debug, Error)]
104pub enum Error {
105    #[error("runtime error: {0}")]
106    Runtime(#[from] commonware_runtime::Error),
107    #[error("codec error: {0}")]
108    Codec(#[from] commonware_codec::Error),
109    #[error("invalid blob name: {0}")]
110    InvalidBlobName(String),
111    #[error("invalid record: {0}")]
112    InvalidRecord(u64),
113    #[error("missing record at {0}")]
114    MissingRecord(u64),
115}
116
117/// Configuration for [Ordinal] storage.
118#[derive(Clone)]
119pub struct Config {
120    /// The [commonware_runtime::Storage] partition to use for storing the index.
121    pub partition: String,
122
123    /// The maximum number of items to store in each index blob.
124    pub items_per_blob: NonZeroU64,
125
126    /// The size of the write buffer to use when writing to the index.
127    pub write_buffer: NonZeroUsize,
128
129    /// The size of the read buffer to use on restart.
130    pub replay_buffer: NonZeroUsize,
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136    use commonware_macros::test_traced;
137    use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
138    use commonware_utils::{sequence::FixedBytes, BitVec, NZUsize, NZU64};
139    use rand::RngCore;
140    use std::collections::BTreeMap;
141
142    const DEFAULT_ITEMS_PER_BLOB: u64 = 1000;
143    const DEFAULT_WRITE_BUFFER: usize = 4096;
144    const DEFAULT_REPLAY_BUFFER: usize = 1024 * 1024;
145
146    #[test_traced]
147    fn test_put_get() {
148        // Initialize the deterministic context
149        let executor = deterministic::Runner::default();
150        executor.start(|context| async move {
151            // Initialize the store
152            let cfg = Config {
153                partition: "test_ordinal".into(),
154                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
155                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
156                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
157            };
158            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
159                .await
160                .expect("Failed to initialize store");
161
162            let value = FixedBytes::new([42u8; 32]);
163
164            // Check index doesn't exist
165            assert!(!store.has(0));
166
167            // Put the value at index 0
168            store
169                .put(0, value.clone())
170                .await
171                .expect("Failed to put data");
172
173            // Check index exists
174            assert!(store.has(0));
175
176            // Get the value back (before sync)
177            let retrieved = store
178                .get(0)
179                .await
180                .expect("Failed to get data")
181                .expect("Data not found");
182            assert_eq!(retrieved, value);
183
184            // Force a sync
185            store.sync().await.expect("Failed to sync data");
186
187            // Check metrics
188            let buffer = context.encode();
189            assert!(buffer.contains("gets_total 1"), "{}", buffer);
190            assert!(buffer.contains("puts_total 1"), "{}", buffer);
191            assert!(buffer.contains("has_total 2"), "{}", buffer);
192            assert!(buffer.contains("syncs_total 1"), "{}", buffer);
193            assert!(buffer.contains("pruned_total 0"), "{}", buffer);
194
195            // Get the value back (after sync)
196            let retrieved = store
197                .get(0)
198                .await
199                .expect("Failed to get data")
200                .expect("Data not found");
201            assert_eq!(retrieved, value);
202        });
203    }
204
205    #[test_traced]
206    fn test_multiple_indices() {
207        // Initialize the deterministic context
208        let executor = deterministic::Runner::default();
209        executor.start(|context| async move {
210            // Initialize the store
211            let cfg = Config {
212                partition: "test_ordinal".into(),
213                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
214                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
215                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
216            };
217            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
218                .await
219                .expect("Failed to initialize store");
220
221            // Insert multiple values at different indices
222            let indices = vec![
223                (0u64, FixedBytes::new([0u8; 32])),
224                (5u64, FixedBytes::new([5u8; 32])),
225                (10u64, FixedBytes::new([10u8; 32])),
226                (100u64, FixedBytes::new([100u8; 32])),
227                (1000u64, FixedBytes::new([200u8; 32])), // Different blob
228            ];
229
230            for (index, value) in &indices {
231                store
232                    .put(*index, value.clone())
233                    .await
234                    .expect("Failed to put data");
235            }
236
237            // Sync to disk
238            store.sync().await.expect("Failed to sync");
239
240            // Retrieve all values and verify
241            for (index, value) in &indices {
242                let retrieved = store
243                    .get(*index)
244                    .await
245                    .expect("Failed to get data")
246                    .expect("Data not found");
247                assert_eq!(&retrieved, value);
248            }
249        });
250    }
251
252    #[test_traced]
253    fn test_sparse_indices() {
254        // Initialize the deterministic context
255        let executor = deterministic::Runner::default();
256        executor.start(|context| async move {
257            // Initialize the store
258            let cfg = Config {
259                partition: "test_ordinal".into(),
260                items_per_blob: NZU64!(100), // Smaller blobs for testing
261                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
262                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
263            };
264            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
265                .await
266                .expect("Failed to initialize store");
267
268            // Insert sparse values
269            let indices = vec![
270                (0u64, FixedBytes::new([0u8; 32])),
271                (99u64, FixedBytes::new([99u8; 32])), // End of first blob
272                (100u64, FixedBytes::new([100u8; 32])), // Start of second blob
273                (500u64, FixedBytes::new([200u8; 32])), // Start of sixth blob
274            ];
275
276            for (index, value) in &indices {
277                store
278                    .put(*index, value.clone())
279                    .await
280                    .expect("Failed to put data");
281            }
282
283            // Check that intermediate indices don't exist
284            assert!(!store.has(1));
285            assert!(!store.has(50));
286            assert!(!store.has(101));
287            assert!(!store.has(499));
288
289            // Sync and verify
290            store.sync().await.expect("Failed to sync");
291
292            for (index, value) in &indices {
293                let retrieved = store
294                    .get(*index)
295                    .await
296                    .expect("Failed to get data")
297                    .expect("Data not found");
298                assert_eq!(&retrieved, value);
299            }
300        });
301    }
302
303    #[test_traced]
304    fn test_next_gap() {
305        // Initialize the deterministic context
306        let executor = deterministic::Runner::default();
307        executor.start(|context| async move {
308            // Initialize the store
309            let cfg = Config {
310                partition: "test_ordinal".into(),
311                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
312                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
313                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
314            };
315            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
316                .await
317                .expect("Failed to initialize store");
318
319            // Insert values with gaps
320            store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
321            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
322            store.put(11, FixedBytes::new([11u8; 32])).await.unwrap();
323            store.put(14, FixedBytes::new([14u8; 32])).await.unwrap();
324
325            // Check gaps
326            let (current_end, start_next) = store.next_gap(0);
327            assert!(current_end.is_none());
328            assert_eq!(start_next, Some(1));
329
330            let (current_end, start_next) = store.next_gap(1);
331            assert_eq!(current_end, Some(1));
332            assert_eq!(start_next, Some(10));
333
334            let (current_end, start_next) = store.next_gap(10);
335            assert_eq!(current_end, Some(11));
336            assert_eq!(start_next, Some(14));
337
338            let (current_end, start_next) = store.next_gap(11);
339            assert_eq!(current_end, Some(11));
340            assert_eq!(start_next, Some(14));
341
342            let (current_end, start_next) = store.next_gap(12);
343            assert!(current_end.is_none());
344            assert_eq!(start_next, Some(14));
345
346            let (current_end, start_next) = store.next_gap(14);
347            assert_eq!(current_end, Some(14));
348            assert!(start_next.is_none());
349        });
350    }
351
352    #[test_traced]
353    fn test_missing_items() {
354        // Initialize the deterministic context
355        let executor = deterministic::Runner::default();
356        executor.start(|context| async move {
357            // Initialize the store
358            let cfg = Config {
359                partition: "test_ordinal".into(),
360                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
361                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
362                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
363            };
364            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
365                .await
366                .expect("Failed to initialize store");
367
368            // Test 1: Empty store - should return no items
369            assert_eq!(store.missing_items(0, 5), Vec::<u64>::new());
370            assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
371
372            // Test 2: Insert values with gaps
373            store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
374            store.put(2, FixedBytes::new([2u8; 32])).await.unwrap();
375            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
376            store.put(6, FixedBytes::new([6u8; 32])).await.unwrap();
377            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
378
379            // Test 3: Find missing items from the beginning
380            assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
381            assert_eq!(store.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
382            assert_eq!(store.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
383
384            // Test 4: Find missing items from within a gap
385            assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
386            assert_eq!(store.missing_items(4, 2), vec![4, 7]);
387
388            // Test 5: Find missing items from within a range
389            assert_eq!(store.missing_items(1, 3), vec![3, 4, 7]);
390            assert_eq!(store.missing_items(2, 4), vec![3, 4, 7, 8]);
391            assert_eq!(store.missing_items(5, 2), vec![7, 8]);
392
393            // Test 6: Find missing items after the last range (no more gaps)
394            assert_eq!(store.missing_items(11, 5), Vec::<u64>::new());
395            assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
396
397            // Test 7: Large gap scenario
398            store.put(1000, FixedBytes::new([100u8; 32])).await.unwrap();
399
400            // Gap between 10 and 1000
401            let items = store.missing_items(11, 10);
402            assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
403
404            // Request more items than available in gap
405            let items = store.missing_items(990, 15);
406            assert_eq!(
407                items,
408                vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
409            );
410
411            // Test 8: After syncing (data should remain consistent)
412            store.sync().await.unwrap();
413            assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
414            assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
415
416            // Test 9: Cross-blob boundary scenario
417            store.put(9999, FixedBytes::new([99u8; 32])).await.unwrap();
418            store
419                .put(10001, FixedBytes::new([101u8; 32]))
420                .await
421                .unwrap();
422
423            // Find missing items across blob boundary (10000 is the boundary)
424            let items = store.missing_items(9998, 5);
425            assert_eq!(items, vec![9998, 10000]);
426
427            store.close().await.expect("Failed to close store");
428        });
429    }
430
431    #[test_traced]
432    fn test_restart() {
433        // Initialize the deterministic context
434        let executor = deterministic::Runner::default();
435        executor.start(|context| async move {
436            let cfg = Config {
437                partition: "test_ordinal".into(),
438                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
439                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
440                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
441            };
442
443            // Insert data and close
444            {
445                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
446                    .await
447                    .expect("Failed to initialize store");
448
449                let values = vec![
450                    (0u64, FixedBytes::new([0u8; 32])),
451                    (100u64, FixedBytes::new([100u8; 32])),
452                    (1000u64, FixedBytes::new([200u8; 32])),
453                ];
454
455                for (index, value) in &values {
456                    store
457                        .put(*index, value.clone())
458                        .await
459                        .expect("Failed to put data");
460                }
461
462                store.close().await.expect("Failed to close store");
463            }
464
465            // Reopen and verify data persisted
466            {
467                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
468                    .await
469                    .expect("Failed to initialize store");
470
471                let values = vec![
472                    (0u64, FixedBytes::new([0u8; 32])),
473                    (100u64, FixedBytes::new([100u8; 32])),
474                    (1000u64, FixedBytes::new([200u8; 32])),
475                ];
476
477                for (index, value) in &values {
478                    let retrieved = store
479                        .get(*index)
480                        .await
481                        .expect("Failed to get data")
482                        .expect("Data not found");
483                    assert_eq!(&retrieved, value);
484                }
485
486                // Check gaps are preserved
487                let (current_end, start_next) = store.next_gap(0);
488                assert_eq!(current_end, Some(0));
489                assert_eq!(start_next, Some(100));
490            }
491        });
492    }
493
494    #[test_traced]
495    fn test_invalid_record() {
496        // Initialize the deterministic context
497        let executor = deterministic::Runner::default();
498        executor.start(|context| async move {
499            let cfg = Config {
500                partition: "test_ordinal".into(),
501                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
502                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
503                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
504            };
505
506            // Create store with data
507            {
508                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
509                    .await
510                    .expect("Failed to initialize store");
511
512                store
513                    .put(0, FixedBytes::new([42u8; 32]))
514                    .await
515                    .expect("Failed to put data");
516                store.close().await.expect("Failed to close store");
517            }
518
519            // Corrupt the data
520            {
521                let (blob, _) = context
522                    .open("test_ordinal", &0u64.to_be_bytes())
523                    .await
524                    .unwrap();
525                // Corrupt the CRC by changing a byte
526                blob.write_at(vec![0xFF], 32).await.unwrap();
527                blob.sync().await.unwrap();
528            }
529
530            // Reopen and try to read corrupted data
531            {
532                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
533                    .await
534                    .expect("Failed to initialize store");
535
536                // Reading corrupt record will return empty
537                let result = store.get(0).await.unwrap();
538                assert!(result.is_none());
539
540                // The index should not be in the intervals after restart with corrupted data
541                assert!(!store.has(0));
542            }
543        });
544    }
545
546    #[test_traced]
547    fn test_get_nonexistent() {
548        // Initialize the deterministic context
549        let executor = deterministic::Runner::default();
550        executor.start(|context| async move {
551            // Initialize the store
552            let cfg = Config {
553                partition: "test_ordinal".into(),
554                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
555                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
556                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
557            };
558            let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
559                .await
560                .expect("Failed to initialize store");
561
562            // Attempt to get an index that doesn't exist
563            let retrieved = store.get(999).await.expect("Failed to get data");
564            assert!(retrieved.is_none());
565
566            // Check has returns false
567            assert!(!store.has(999));
568        });
569    }
570
571    #[test_traced]
572    fn test_destroy() {
573        // Initialize the deterministic context
574        let executor = deterministic::Runner::default();
575        executor.start(|context| async move {
576            let cfg = Config {
577                partition: "test_ordinal".into(),
578                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
579                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
580                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
581            };
582
583            // Create store with data
584            {
585                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
586                    .await
587                    .expect("Failed to initialize store");
588
589                store
590                    .put(0, FixedBytes::new([0u8; 32]))
591                    .await
592                    .expect("Failed to put data");
593                store
594                    .put(1000, FixedBytes::new([100u8; 32]))
595                    .await
596                    .expect("Failed to put data");
597
598                // Destroy the store
599                store.destroy().await.expect("Failed to destroy store");
600            }
601
602            // Try to create a new store - it should be empty
603            {
604                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
605                    .await
606                    .expect("Failed to initialize store");
607
608                // Should not find any data
609                assert!(store.get(0).await.unwrap().is_none());
610                assert!(store.get(1000).await.unwrap().is_none());
611                assert!(!store.has(0));
612                assert!(!store.has(1000));
613            }
614        });
615    }
616
617    #[test_traced]
618    fn test_partial_record_write() {
619        // Initialize the deterministic context
620        let executor = deterministic::Runner::default();
621        executor.start(|context| async move {
622            let cfg = Config {
623                partition: "test_ordinal".into(),
624                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
625                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
626                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
627            };
628
629            // Create store with data
630            {
631                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
632                    .await
633                    .expect("Failed to initialize store");
634
635                store
636                    .put(0, FixedBytes::new([42u8; 32]))
637                    .await
638                    .expect("Failed to put data");
639                store
640                    .put(1, FixedBytes::new([43u8; 32]))
641                    .await
642                    .expect("Failed to put data");
643                store.close().await.expect("Failed to close store");
644            }
645
646            // Corrupt by writing partial record (only value, no CRC)
647            {
648                let (blob, _) = context
649                    .open("test_ordinal", &0u64.to_be_bytes())
650                    .await
651                    .unwrap();
652                // Overwrite second record with partial data (32 bytes instead of 36)
653                blob.write_at(vec![0xFF; 32], 36).await.unwrap();
654                blob.sync().await.unwrap();
655            }
656
657            // Reopen and verify it handles partial write gracefully
658            {
659                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
660                    .await
661                    .expect("Failed to initialize store");
662
663                // First record should be fine
664                assert_eq!(
665                    store.get(0).await.unwrap().unwrap(),
666                    FixedBytes::new([42u8; 32])
667                );
668
669                // Second record should be removed due to partial write
670                assert!(!store.has(1));
671                assert!(store.get(1).await.unwrap().is_none());
672
673                // Store should still be functional
674                let mut store_mut = store;
675                store_mut.put(1, FixedBytes::new([44u8; 32])).await.unwrap();
676                assert_eq!(
677                    store_mut.get(1).await.unwrap().unwrap(),
678                    FixedBytes::new([44u8; 32])
679                );
680            }
681        });
682    }
683
684    #[test_traced]
685    fn test_corrupted_value() {
686        // Initialize the deterministic context
687        let executor = deterministic::Runner::default();
688        executor.start(|context| async move {
689            let cfg = Config {
690                partition: "test_ordinal".into(),
691                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
692                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
693                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
694            };
695
696            // Create store with data
697            {
698                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
699                    .await
700                    .expect("Failed to initialize store");
701
702                store
703                    .put(0, FixedBytes::new([42u8; 32]))
704                    .await
705                    .expect("Failed to put data");
706                store
707                    .put(1, FixedBytes::new([43u8; 32]))
708                    .await
709                    .expect("Failed to put data");
710                store.close().await.expect("Failed to close store");
711            }
712
713            // Corrupt the value portion of a record
714            {
715                let (blob, _) = context
716                    .open("test_ordinal", &0u64.to_be_bytes())
717                    .await
718                    .unwrap();
719                // Corrupt some bytes in the value of the first record
720                blob.write_at(vec![0xFF, 0xFF, 0xFF, 0xFF], 10)
721                    .await
722                    .unwrap();
723                blob.sync().await.unwrap();
724            }
725
726            // Reopen and verify it detects corruption
727            {
728                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
729                    .await
730                    .expect("Failed to initialize store");
731
732                // First record should be detected as corrupted (CRC mismatch)
733                assert!(!store.has(0));
734
735                // Second record should still be valid
736                assert!(store.has(1));
737                assert_eq!(
738                    store.get(1).await.unwrap().unwrap(),
739                    FixedBytes::new([43u8; 32])
740                );
741            }
742        });
743    }
744
745    #[test_traced]
746    fn test_crc_corruptions() {
747        // Initialize the deterministic context
748        let executor = deterministic::Runner::default();
749        executor.start(|context| async move {
750            let cfg = Config {
751                partition: "test_ordinal".into(),
752                items_per_blob: NZU64!(10), // Small blob size for testing
753                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
754                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
755            };
756
757            // Create store with data across multiple blobs
758            {
759                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
760                    .await
761                    .expect("Failed to initialize store");
762
763                // Add values across 2 blobs
764                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
765                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
766                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
767                store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
768                store.close().await.expect("Failed to close store");
769            }
770
771            // Corrupt CRCs in different blobs
772            {
773                // Corrupt CRC in first blob
774                let (blob, _) = context
775                    .open("test_ordinal", &0u64.to_be_bytes())
776                    .await
777                    .unwrap();
778                blob.write_at(vec![0xFF], 32).await.unwrap(); // Corrupt CRC of index 0
779                blob.sync().await.unwrap();
780
781                // Corrupt value in second blob (which will invalidate CRC)
782                let (blob, _) = context
783                    .open("test_ordinal", &1u64.to_be_bytes())
784                    .await
785                    .unwrap();
786                blob.write_at(vec![0xFF; 4], 5).await.unwrap(); // Corrupt value of index 10
787                blob.sync().await.unwrap();
788            }
789
790            // Reopen and verify handling of CRC corruptions
791            {
792                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
793                    .await
794                    .expect("Failed to initialize store");
795
796                // Corrupted records should not be present
797                assert!(!store.has(0)); // CRC corrupted
798                assert!(!store.has(10)); // Value corrupted (CRC mismatch)
799
800                // Valid records should still be accessible
801                assert!(store.has(5));
802                assert!(store.has(15));
803                assert_eq!(
804                    store.get(5).await.unwrap().unwrap(),
805                    FixedBytes::new([5u8; 32])
806                );
807                assert_eq!(
808                    store.get(15).await.unwrap().unwrap(),
809                    FixedBytes::new([15u8; 32])
810                );
811            }
812        });
813    }
814
815    #[test_traced]
816    fn test_extra_bytes_in_blob() {
817        // Initialize the deterministic context
818        let executor = deterministic::Runner::default();
819        executor.start(|context| async move {
820            let cfg = Config {
821                partition: "test_ordinal".into(),
822                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
823                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
824                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
825            };
826
827            // Create store with data
828            {
829                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
830                    .await
831                    .expect("Failed to initialize store");
832
833                store
834                    .put(0, FixedBytes::new([42u8; 32]))
835                    .await
836                    .expect("Failed to put data");
837                store
838                    .put(1, FixedBytes::new([43u8; 32]))
839                    .await
840                    .expect("Failed to put data");
841                store.close().await.expect("Failed to close store");
842            }
843
844            // Add extra bytes at the end of blob
845            {
846                let (blob, size) = context
847                    .open("test_ordinal", &0u64.to_be_bytes())
848                    .await
849                    .unwrap();
850                // Add garbage data that forms a complete but invalid record
851                // This avoids partial record issues
852                let mut garbage = vec![0xFF; 32]; // Invalid value
853                let invalid_crc = 0xDEADBEEFu32;
854                garbage.extend_from_slice(&invalid_crc.to_be_bytes());
855                assert_eq!(garbage.len(), 36); // Full record size
856                blob.write_at(garbage, size).await.unwrap();
857                blob.sync().await.unwrap();
858            }
859
860            // Reopen and verify it handles extra bytes
861            {
862                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
863                    .await
864                    .expect("Failed to initialize store");
865
866                // Original records should still be valid
867                assert!(store.has(0));
868                assert!(store.has(1));
869                assert_eq!(
870                    store.get(0).await.unwrap().unwrap(),
871                    FixedBytes::new([42u8; 32])
872                );
873                assert_eq!(
874                    store.get(1).await.unwrap().unwrap(),
875                    FixedBytes::new([43u8; 32])
876                );
877
878                // Store should still be functional
879                let mut store_mut = store;
880                store_mut.put(2, FixedBytes::new([44u8; 32])).await.unwrap();
881                assert_eq!(
882                    store_mut.get(2).await.unwrap().unwrap(),
883                    FixedBytes::new([44u8; 32])
884                );
885            }
886        });
887    }
888
889    #[test_traced]
890    fn test_zero_filled_records() {
891        // Initialize the deterministic context
892        let executor = deterministic::Runner::default();
893        executor.start(|context| async move {
894            let cfg = Config {
895                partition: "test_ordinal".into(),
896                items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
897                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
898                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
899            };
900
901            // Create blob with zero-filled space
902            {
903                let (blob, _) = context
904                    .open("test_ordinal", &0u64.to_be_bytes())
905                    .await
906                    .unwrap();
907
908                // Write zeros for several record positions
909                let zeros = vec![0u8; 36 * 5]; // 5 records worth of zeros
910                blob.write_at(zeros, 0).await.unwrap();
911
912                // Write a valid record after the zeros
913                let mut valid_record = vec![44u8; 32];
914                let crc = crc32fast::hash(&valid_record);
915                valid_record.extend_from_slice(&crc.to_be_bytes());
916                blob.write_at(valid_record, 36 * 5).await.unwrap();
917
918                blob.sync().await.unwrap();
919            }
920
921            // Initialize store and verify it handles zero-filled records
922            {
923                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
924                    .await
925                    .expect("Failed to initialize store");
926
927                // Zero-filled positions should not be considered valid
928                for i in 0..5 {
929                    assert!(!store.has(i));
930                }
931
932                // The valid record should be found
933                assert!(store.has(5));
934                assert_eq!(
935                    store.get(5).await.unwrap().unwrap(),
936                    FixedBytes::new([44u8; 32])
937                );
938            }
939        });
940    }
941
942    fn test_operations_and_restart(num_values: usize) -> String {
943        // Initialize the deterministic context
944        let executor = deterministic::Runner::default();
945        executor.start(|mut context| async move {
946            let cfg = Config {
947                partition: "test_ordinal".into(),
948                items_per_blob: NZU64!(100), // Smaller blobs to test multiple blob handling
949                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
950                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
951            };
952
953            // Initialize the store
954            let mut store = Ordinal::<_, FixedBytes<128>>::init(context.clone(), cfg.clone())
955                .await
956                .expect("Failed to initialize store");
957
958            // Generate and insert random values at various indices
959            let mut values = Vec::new();
960            let mut rng_index = 0u64;
961
962            for _ in 0..num_values {
963                // Generate a pseudo-random index (sparse to test gaps)
964                let mut index_bytes = [0u8; 8];
965                context.fill_bytes(&mut index_bytes);
966                let index_offset = u64::from_be_bytes(index_bytes) % 1000;
967                let index = rng_index + index_offset;
968                rng_index = index + 1;
969
970                // Generate random value
971                let mut value = [0u8; 128];
972                context.fill_bytes(&mut value);
973                let value = FixedBytes::<128>::new(value);
974
975                store
976                    .put(index, value.clone())
977                    .await
978                    .expect("Failed to put data");
979                values.push((index, value));
980            }
981
982            // Sync data
983            store.sync().await.expect("Failed to sync");
984
985            // Verify all values can be retrieved
986            for (index, value) in &values {
987                let retrieved = store
988                    .get(*index)
989                    .await
990                    .expect("Failed to get data")
991                    .expect("Data not found");
992                assert_eq!(&retrieved, value);
993            }
994
995            // Test next_gap on various indices
996            for i in 0..10 {
997                let _ = store.next_gap(i * 100);
998            }
999
1000            // Close the store
1001            store.close().await.expect("Failed to close store");
1002
1003            // Reopen the store
1004            let mut store = Ordinal::<_, FixedBytes<128>>::init(context.clone(), cfg)
1005                .await
1006                .expect("Failed to initialize store");
1007
1008            // Verify all values are still there after restart
1009            for (index, value) in &values {
1010                let retrieved = store
1011                    .get(*index)
1012                    .await
1013                    .expect("Failed to get data")
1014                    .expect("Data not found");
1015                assert_eq!(&retrieved, value);
1016            }
1017
1018            // Add more values after restart
1019            for _ in 0..10 {
1020                let mut index_bytes = [0u8; 8];
1021                context.fill_bytes(&mut index_bytes);
1022                let index = u64::from_be_bytes(index_bytes) % 10000;
1023
1024                let mut value = [0u8; 128];
1025                context.fill_bytes(&mut value);
1026                let value = FixedBytes::<128>::new(value);
1027
1028                store.put(index, value).await.expect("Failed to put data");
1029            }
1030
1031            // Final sync
1032            store.sync().await.expect("Failed to sync");
1033
1034            // Return the auditor state for comparison
1035            context.auditor().state()
1036        })
1037    }
1038
1039    #[test_traced]
1040    #[ignore]
1041    fn test_determinism() {
1042        let state1 = test_operations_and_restart(100);
1043        let state2 = test_operations_and_restart(100);
1044        assert_eq!(state1, state2);
1045    }
1046
1047    #[test_traced]
1048    fn test_prune_basic() {
1049        // Initialize the deterministic context
1050        let executor = deterministic::Runner::default();
1051        executor.start(|context| async move {
1052            let cfg = Config {
1053                partition: "test_ordinal".into(),
1054                items_per_blob: NZU64!(100), // Small blobs to test multiple blob handling
1055                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1056                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1057            };
1058
1059            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1060                .await
1061                .expect("Failed to initialize store");
1062
1063            // Insert data across multiple blobs
1064            let values = vec![
1065                (0u64, FixedBytes::new([0u8; 32])),     // Blob 0
1066                (50u64, FixedBytes::new([50u8; 32])),   // Blob 0
1067                (100u64, FixedBytes::new([100u8; 32])), // Blob 1
1068                (150u64, FixedBytes::new([150u8; 32])), // Blob 1
1069                (200u64, FixedBytes::new([200u8; 32])), // Blob 2
1070                (300u64, FixedBytes::new([44u8; 32])),  // Blob 3
1071            ];
1072
1073            for (index, value) in &values {
1074                store
1075                    .put(*index, value.clone())
1076                    .await
1077                    .expect("Failed to put data");
1078            }
1079            store.sync().await.unwrap();
1080
1081            // Verify all values exist
1082            for (index, value) in &values {
1083                assert_eq!(store.get(*index).await.unwrap().unwrap(), *value);
1084            }
1085
1086            // Prune up to index 150 (should remove blob 0 only)
1087            store.prune(150).await.unwrap();
1088            let buffer = context.encode();
1089            assert!(buffer.contains("pruned_total 1"));
1090
1091            // Verify pruned data is gone
1092            assert!(!store.has(0));
1093            assert!(!store.has(50));
1094            assert!(store.get(0).await.unwrap().is_none());
1095            assert!(store.get(50).await.unwrap().is_none());
1096
1097            // Verify remaining data is still there
1098            assert!(store.has(100));
1099            assert!(store.has(150));
1100            assert!(store.has(200));
1101            assert!(store.has(300));
1102            assert_eq!(store.get(100).await.unwrap().unwrap(), values[2].1);
1103            assert_eq!(store.get(150).await.unwrap().unwrap(), values[3].1);
1104            assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1105            assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1106
1107            // Prune more aggressively - up to index 250 (should remove blob 1)
1108            store.prune(250).await.unwrap();
1109            let buffer = context.encode();
1110            assert!(buffer.contains("pruned_total 2"));
1111
1112            // Verify more data is pruned
1113            assert!(!store.has(100));
1114            assert!(!store.has(150));
1115            assert!(store.get(100).await.unwrap().is_none());
1116            assert!(store.get(150).await.unwrap().is_none());
1117
1118            // Verify remaining data
1119            assert!(store.has(200));
1120            assert!(store.has(300));
1121            assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1122            assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1123        });
1124    }
1125
1126    #[test_traced]
1127    fn test_prune_with_gaps() {
1128        // Initialize the deterministic context
1129        let executor = deterministic::Runner::default();
1130        executor.start(|context| async move {
1131            let cfg = Config {
1132                partition: "test_ordinal".into(),
1133                items_per_blob: NZU64!(100),
1134                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1135                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1136            };
1137
1138            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1139                .await
1140                .expect("Failed to initialize store");
1141
1142            // Insert sparse data with gaps
1143            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1144            store.put(105, FixedBytes::new([105u8; 32])).await.unwrap();
1145            store.put(305, FixedBytes::new([49u8; 32])).await.unwrap();
1146            store.sync().await.unwrap();
1147
1148            // Check gaps before pruning
1149            let (current_end, next_start) = store.next_gap(0);
1150            assert!(current_end.is_none());
1151            assert_eq!(next_start, Some(5));
1152
1153            let (current_end, next_start) = store.next_gap(5);
1154            assert_eq!(current_end, Some(5));
1155            assert_eq!(next_start, Some(105));
1156
1157            // Prune up to index 150 (should remove blob 0)
1158            store.prune(150).await.unwrap();
1159
1160            // Verify pruned data is gone
1161            assert!(!store.has(5));
1162            assert!(store.get(5).await.unwrap().is_none());
1163
1164            // Verify remaining data and gaps
1165            assert!(store.has(105));
1166            assert!(store.has(305));
1167
1168            let (current_end, next_start) = store.next_gap(0);
1169            assert!(current_end.is_none());
1170            assert_eq!(next_start, Some(105));
1171
1172            let (current_end, next_start) = store.next_gap(105);
1173            assert_eq!(current_end, Some(105));
1174            assert_eq!(next_start, Some(305));
1175        });
1176    }
1177
1178    #[test_traced]
1179    fn test_prune_no_op() {
1180        // Initialize the deterministic context
1181        let executor = deterministic::Runner::default();
1182        executor.start(|context| async move {
1183            let cfg = Config {
1184                partition: "test_ordinal".into(),
1185                items_per_blob: NZU64!(100),
1186                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1187                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1188            };
1189
1190            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1191                .await
1192                .expect("Failed to initialize store");
1193
1194            // Insert data
1195            store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1196            store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1197            store.sync().await.unwrap();
1198
1199            // Try to prune before any data - should be no-op
1200            store.prune(50).await.unwrap();
1201
1202            // Verify no data was actually pruned
1203            assert!(store.has(100));
1204            assert!(store.has(200));
1205            let buffer = context.encode();
1206            assert!(buffer.contains("pruned_total 0"));
1207
1208            // Try to prune exactly at blob boundary - should be no-op
1209            store.prune(100).await.unwrap();
1210
1211            // Verify still no data pruned
1212            assert!(store.has(100));
1213            assert!(store.has(200));
1214            let buffer = context.encode();
1215            assert!(buffer.contains("pruned_total 0"));
1216        });
1217    }
1218
1219    #[test_traced]
1220    fn test_prune_empty_store() {
1221        // Initialize the deterministic context
1222        let executor = deterministic::Runner::default();
1223        executor.start(|context| async move {
1224            let cfg = Config {
1225                partition: "test_ordinal".into(),
1226                items_per_blob: NZU64!(100),
1227                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1228                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1229            };
1230
1231            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1232                .await
1233                .expect("Failed to initialize store");
1234
1235            // Try to prune empty store
1236            store.prune(1000).await.unwrap();
1237
1238            // Store should still be functional
1239            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1240            assert!(store.has(0));
1241        });
1242    }
1243
1244    #[test_traced]
1245    fn test_prune_after_restart() {
1246        // Initialize the deterministic context
1247        let executor = deterministic::Runner::default();
1248        executor.start(|context| async move {
1249            let cfg = Config {
1250                partition: "test_ordinal".into(),
1251                items_per_blob: NZU64!(100),
1252                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1253                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1254            };
1255
1256            // Create store and add data
1257            {
1258                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1259                    .await
1260                    .expect("Failed to initialize store");
1261
1262                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1263                store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1264                store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1265                store.close().await.unwrap();
1266            }
1267
1268            // Reopen and prune
1269            {
1270                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1271                    .await
1272                    .expect("Failed to initialize store");
1273
1274                // Verify data is there
1275                assert!(store.has(0));
1276                assert!(store.has(100));
1277                assert!(store.has(200));
1278
1279                // Prune up to index 150
1280                store.prune(150).await.unwrap();
1281
1282                // Verify pruning worked
1283                assert!(!store.has(0));
1284                assert!(store.has(100));
1285                assert!(store.has(200));
1286
1287                store.close().await.unwrap();
1288            }
1289
1290            // Reopen again and verify pruning persisted
1291            {
1292                let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1293                    .await
1294                    .expect("Failed to initialize store");
1295
1296                assert!(!store.has(0));
1297                assert!(store.has(100));
1298                assert!(store.has(200));
1299
1300                // Check gaps
1301                let (current_end, next_start) = store.next_gap(0);
1302                assert!(current_end.is_none());
1303                assert_eq!(next_start, Some(100));
1304            }
1305        });
1306    }
1307
1308    #[test_traced]
1309    fn test_prune_multiple_operations() {
1310        // Initialize the deterministic context
1311        let executor = deterministic::Runner::default();
1312        executor.start(|context| async move {
1313            let cfg = Config {
1314                partition: "test_ordinal".into(),
1315                items_per_blob: NZU64!(50), // Smaller blobs for more granular testing
1316                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1317                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1318            };
1319
1320            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1321                .await
1322                .expect("Failed to initialize store");
1323
1324            // Insert data across many blobs
1325            let mut values = Vec::new();
1326            for i in 0..10 {
1327                let index = i * 50 + 25; // Middle of each blob
1328                let value = FixedBytes::new([i as u8; 32]);
1329                store.put(index, value.clone()).await.unwrap();
1330                values.push((index, value));
1331            }
1332            store.sync().await.unwrap();
1333
1334            // Prune incrementally
1335            for i in 1..5 {
1336                let prune_index = i * 50 + 10;
1337                store.prune(prune_index).await.unwrap();
1338
1339                // Verify appropriate data is pruned
1340                for (index, _) in &values {
1341                    if *index < prune_index {
1342                        assert!(!store.has(*index), "Index {index} should be pruned");
1343                    } else {
1344                        assert!(store.has(*index), "Index {index} should not be pruned");
1345                    }
1346                }
1347            }
1348
1349            // Check final state
1350            let buffer = context.encode();
1351            assert!(buffer.contains("pruned_total 4"));
1352
1353            // Verify remaining data
1354            for i in 4..10 {
1355                let index = i * 50 + 25;
1356                assert!(store.has(index));
1357                assert_eq!(
1358                    store.get(index).await.unwrap().unwrap(),
1359                    values[i as usize].1
1360                );
1361            }
1362        });
1363    }
1364
1365    #[test_traced]
1366    fn test_prune_blob_boundaries() {
1367        // Initialize the deterministic context
1368        let executor = deterministic::Runner::default();
1369        executor.start(|context| async move {
1370            let cfg = Config {
1371                partition: "test_ordinal".into(),
1372                items_per_blob: NZU64!(100),
1373                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1374                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1375            };
1376
1377            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1378                .await
1379                .expect("Failed to initialize store");
1380
1381            // Insert data at blob boundaries
1382            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); // Start of blob 0
1383            store.put(99, FixedBytes::new([99u8; 32])).await.unwrap(); // End of blob 0
1384            store.put(100, FixedBytes::new([100u8; 32])).await.unwrap(); // Start of blob 1
1385            store.put(199, FixedBytes::new([199u8; 32])).await.unwrap(); // End of blob 1
1386            store.put(200, FixedBytes::new([200u8; 32])).await.unwrap(); // Start of blob 2
1387            store.sync().await.unwrap();
1388
1389            // Test various pruning points around boundaries
1390
1391            // Prune exactly at blob boundary (100) - should prune blob 0
1392            store.prune(100).await.unwrap();
1393            assert!(!store.has(0));
1394            assert!(!store.has(99));
1395            assert!(store.has(100));
1396            assert!(store.has(199));
1397            assert!(store.has(200));
1398
1399            // Prune just before next boundary (199) - should not prune blob 1
1400            store.prune(199).await.unwrap();
1401            assert!(store.has(100));
1402            assert!(store.has(199));
1403            assert!(store.has(200));
1404
1405            // Prune exactly at next boundary (200) - should prune blob 1
1406            store.prune(200).await.unwrap();
1407            assert!(!store.has(100));
1408            assert!(!store.has(199));
1409            assert!(store.has(200));
1410
1411            let buffer = context.encode();
1412            assert!(buffer.contains("pruned_total 2"));
1413        });
1414    }
1415
1416    #[test_traced]
1417    fn test_prune_non_contiguous_sections() {
1418        // Initialize the deterministic context
1419        let executor = deterministic::Runner::default();
1420        executor.start(|context| async move {
1421            let cfg = Config {
1422                partition: "test_ordinal".into(),
1423                items_per_blob: NZU64!(100),
1424                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1425                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1426            };
1427
1428            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1429                .await
1430                .expect("Failed to initialize store");
1431
1432            // Insert data in non-contiguous sections (0, 2, 5, 7)
1433            store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); // Section 0
1434            store.put(250, FixedBytes::new([50u8; 32])).await.unwrap(); // Section 2 (250/100 = 2)
1435            store.put(500, FixedBytes::new([44u8; 32])).await.unwrap(); // Section 5 (500/100 = 5)
1436            store.put(750, FixedBytes::new([45u8; 32])).await.unwrap(); // Section 7 (750/100 = 7)
1437            store.sync().await.unwrap();
1438
1439            // Verify all data exists initially
1440            assert!(store.has(0));
1441            assert!(store.has(250));
1442            assert!(store.has(500));
1443            assert!(store.has(750));
1444
1445            // Prune up to section 3 (index 300) - should remove sections 0 and 2
1446            store.prune(300).await.unwrap();
1447
1448            // Verify correct data was pruned
1449            assert!(!store.has(0)); // Section 0 pruned
1450            assert!(!store.has(250)); // Section 2 pruned
1451            assert!(store.has(500)); // Section 5 remains
1452            assert!(store.has(750)); // Section 7 remains
1453
1454            let buffer = context.encode();
1455            assert!(buffer.contains("pruned_total 2"));
1456
1457            // Prune up to section 6 (index 600) - should remove section 5
1458            store.prune(600).await.unwrap();
1459
1460            // Verify section 5 was pruned
1461            assert!(!store.has(500)); // Section 5 pruned
1462            assert!(store.has(750)); // Section 7 remains
1463
1464            let buffer = context.encode();
1465            assert!(buffer.contains("pruned_total 3"));
1466
1467            // Prune everything - should remove section 7
1468            store.prune(1000).await.unwrap();
1469
1470            // Verify all data is gone
1471            assert!(!store.has(750)); // Section 7 pruned
1472
1473            let buffer = context.encode();
1474            assert!(buffer.contains("pruned_total 4"));
1475        });
1476    }
1477
1478    #[test_traced]
1479    fn test_prune_removes_correct_pending() {
1480        // Initialize the deterministic context
1481        let executor = deterministic::Runner::default();
1482        executor.start(|context| async move {
1483            let cfg = Config {
1484                partition: "test_ordinal".into(),
1485                items_per_blob: NZU64!(100),
1486                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1487                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1488            };
1489            let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1490                .await
1491                .expect("Failed to initialize store");
1492
1493            // Insert and sync some data in blob 0
1494            store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1495            store.sync().await.unwrap();
1496
1497            // Add pending entries to blob 0 and blob 1
1498            store.put(10, FixedBytes::new([10u8; 32])).await.unwrap(); // blob 0
1499            store.put(110, FixedBytes::new([110u8; 32])).await.unwrap(); // blob 1
1500
1501            // Verify all data is visible before pruning
1502            assert!(store.has(5));
1503            assert!(store.has(10));
1504            assert!(store.has(110));
1505
1506            // Prune up to index 100, which should remove blob 0 (indices 0-99).
1507            store.prune(150).await.unwrap();
1508
1509            // Verify that synced and pending entries in blob 0 are removed.
1510            assert!(!store.has(5));
1511            assert!(!store.has(10));
1512
1513            // Verify that the pending entry in blob 1 remains.
1514            assert!(store.has(110));
1515            assert_eq!(
1516                store.get(110).await.unwrap().unwrap(),
1517                FixedBytes::new([110u8; 32])
1518            );
1519
1520            // Sync the remaining pending entry and verify it's still there.
1521            store.sync().await.unwrap();
1522            assert!(store.has(110));
1523            assert_eq!(
1524                store.get(110).await.unwrap().unwrap(),
1525                FixedBytes::new([110u8; 32])
1526            );
1527        });
1528    }
1529
1530    #[test_traced]
1531    fn test_init_with_bits_none() {
1532        // Initialize the deterministic context
1533        let executor = deterministic::Runner::default();
1534        executor.start(|context| async move {
1535            let cfg = Config {
1536                partition: "test_ordinal".into(),
1537                items_per_blob: NZU64!(10), // Small blob size for testing
1538                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1539                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1540            };
1541
1542            // Create store with data across multiple sections
1543            {
1544                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1545                    .await
1546                    .expect("Failed to initialize store");
1547
1548                // Section 0 (indices 0-9)
1549                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1550                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1551                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1552
1553                // Section 1 (indices 10-19)
1554                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1555                store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
1556
1557                // Section 2 (indices 20-29)
1558                store.put(25, FixedBytes::new([25u8; 32])).await.unwrap();
1559
1560                store.close().await.unwrap();
1561            }
1562
1563            // Reinitialize with bits = None (should behave like regular init)
1564            {
1565                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1566                    context.clone(),
1567                    cfg.clone(),
1568                    None,
1569                )
1570                .await
1571                .expect("Failed to initialize store with bits");
1572
1573                // All records should be available
1574                assert!(store.has(0));
1575                assert!(store.has(5));
1576                assert!(store.has(9));
1577                assert!(store.has(10));
1578                assert!(store.has(15));
1579                assert!(store.has(25));
1580
1581                // Non-existent records should not be available
1582                assert!(!store.has(1));
1583                assert!(!store.has(11));
1584                assert!(!store.has(20));
1585
1586                // Verify values
1587                assert_eq!(
1588                    store.get(0).await.unwrap().unwrap(),
1589                    FixedBytes::new([0u8; 32])
1590                );
1591                assert_eq!(
1592                    store.get(15).await.unwrap().unwrap(),
1593                    FixedBytes::new([15u8; 32])
1594                );
1595            }
1596        });
1597    }
1598
1599    #[test_traced]
1600    fn test_init_with_bits_empty_hashmap() {
1601        // Initialize the deterministic context
1602        let executor = deterministic::Runner::default();
1603        executor.start(|context| async move {
1604            let cfg = Config {
1605                partition: "test_ordinal".into(),
1606                items_per_blob: NZU64!(10),
1607                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1608                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1609            };
1610
1611            // Create store with data
1612            {
1613                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1614                    .await
1615                    .expect("Failed to initialize store");
1616
1617                store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1618                store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1619                store.put(20, FixedBytes::new([20u8; 32])).await.unwrap();
1620
1621                store.close().await.unwrap();
1622            }
1623
1624            // Reinitialize with empty HashMap - should skip all sections
1625            {
1626                let bits: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1627                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1628                    context.clone(),
1629                    cfg.clone(),
1630                    Some(bits),
1631                )
1632                .await
1633                .expect("Failed to initialize store with bits");
1634
1635                // No records should be available since no sections were in the bits map
1636                assert!(!store.has(0));
1637                assert!(!store.has(10));
1638                assert!(!store.has(20));
1639            }
1640        });
1641    }
1642
1643    #[test_traced]
1644    fn test_init_with_bits_selective_sections() {
1645        // Initialize the deterministic context
1646        let executor = deterministic::Runner::default();
1647        executor.start(|context| async move {
1648            let cfg = Config {
1649                partition: "test_ordinal".into(),
1650                items_per_blob: NZU64!(10),
1651                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1652                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1653            };
1654
1655            // Create store with data in multiple sections
1656            {
1657                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1658                    .await
1659                    .expect("Failed to initialize store");
1660
1661                // Section 0 (indices 0-9)
1662                for i in 0..10 {
1663                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1664                }
1665
1666                // Section 1 (indices 10-19)
1667                for i in 10..20 {
1668                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1669                }
1670
1671                // Section 2 (indices 20-29)
1672                for i in 20..30 {
1673                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1674                }
1675
1676                store.close().await.unwrap();
1677            }
1678
1679            // Reinitialize with bits for only section 1
1680            {
1681                let mut bits_map: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1682
1683                // Create a BitVec that marks indices 12, 15, and 18 as present
1684                let mut bitvec = BitVec::zeroes(10);
1685                bitvec.set(2); // Index 12 (offset 2 in section 1)
1686                bitvec.set(5); // Index 15 (offset 5 in section 1)
1687                bitvec.set(8); // Index 18 (offset 8 in section 1)
1688                let bitvec_option = Some(bitvec);
1689
1690                bits_map.insert(1, &bitvec_option);
1691
1692                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1693                    context.clone(),
1694                    cfg.clone(),
1695                    Some(bits_map),
1696                )
1697                .await
1698                .expect("Failed to initialize store with bits");
1699
1700                // Only specified indices from section 1 should be available
1701                assert!(store.has(12));
1702                assert!(store.has(15));
1703                assert!(store.has(18));
1704
1705                // Other indices from section 1 should not be available
1706                assert!(!store.has(10));
1707                assert!(!store.has(11));
1708                assert!(!store.has(13));
1709                assert!(!store.has(14));
1710                assert!(!store.has(16));
1711                assert!(!store.has(17));
1712                assert!(!store.has(19));
1713
1714                // All indices from sections 0 and 2 should not be available
1715                for i in 0..10 {
1716                    assert!(!store.has(i));
1717                }
1718                for i in 20..30 {
1719                    assert!(!store.has(i));
1720                }
1721
1722                // Verify the available values
1723                assert_eq!(
1724                    store.get(12).await.unwrap().unwrap(),
1725                    FixedBytes::new([12u8; 32])
1726                );
1727                assert_eq!(
1728                    store.get(15).await.unwrap().unwrap(),
1729                    FixedBytes::new([15u8; 32])
1730                );
1731                assert_eq!(
1732                    store.get(18).await.unwrap().unwrap(),
1733                    FixedBytes::new([18u8; 32])
1734                );
1735            }
1736        });
1737    }
1738
1739    #[test_traced]
1740    fn test_init_with_bits_none_option_all_records_exist() {
1741        // Initialize the deterministic context
1742        let executor = deterministic::Runner::default();
1743        executor.start(|context| async move {
1744            let cfg = Config {
1745                partition: "test_ordinal".into(),
1746                items_per_blob: NZU64!(5),
1747                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1748                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1749            };
1750
1751            // Create store with all records in a section
1752            {
1753                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1754                    .await
1755                    .expect("Failed to initialize store");
1756
1757                // Fill section 1 completely (indices 5-9)
1758                for i in 5..10 {
1759                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1760                }
1761
1762                store.close().await.unwrap();
1763            }
1764
1765            // Reinitialize with None option for section 1 (expects all records)
1766            {
1767                let mut bits_map: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1768                let none_option: Option<BitVec> = None;
1769                bits_map.insert(1, &none_option);
1770
1771                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1772                    context.clone(),
1773                    cfg.clone(),
1774                    Some(bits_map),
1775                )
1776                .await
1777                .expect("Failed to initialize store with bits");
1778
1779                // All records in section 1 should be available
1780                for i in 5..10 {
1781                    assert!(store.has(i));
1782                    assert_eq!(
1783                        store.get(i).await.unwrap().unwrap(),
1784                        FixedBytes::new([i as u8; 32])
1785                    );
1786                }
1787            }
1788        });
1789    }
1790
1791    #[test_traced]
1792    #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(6)")]
1793    fn test_init_with_bits_none_option_missing_record_panics() {
1794        // Initialize the deterministic context
1795        let executor = deterministic::Runner::default();
1796        executor.start(|context| async move {
1797            let cfg = Config {
1798                partition: "test_ordinal".into(),
1799                items_per_blob: NZU64!(5),
1800                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1801                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1802            };
1803
1804            // Create store with missing record in a section
1805            {
1806                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1807                    .await
1808                    .expect("Failed to initialize store");
1809
1810                // Fill section 1 partially (skip index 6)
1811                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1812                // Skip index 6
1813                store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1814                store.put(8, FixedBytes::new([8u8; 32])).await.unwrap();
1815                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1816
1817                store.close().await.unwrap();
1818            }
1819
1820            // Reinitialize with None option for section 1 (expects all records)
1821            // This should panic because index 6 is missing
1822            {
1823                let mut bits_map: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1824                let none_option: Option<BitVec> = None;
1825                bits_map.insert(1, &none_option);
1826
1827                let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1828                    context.clone(),
1829                    cfg.clone(),
1830                    Some(bits_map),
1831                )
1832                .await
1833                .expect("Failed to initialize store with bits");
1834            }
1835        });
1836    }
1837
1838    #[test_traced]
1839    fn test_init_with_bits_mixed_sections() {
1840        // Initialize the deterministic context
1841        let executor = deterministic::Runner::default();
1842        executor.start(|context| async move {
1843            let cfg = Config {
1844                partition: "test_ordinal".into(),
1845                items_per_blob: NZU64!(5),
1846                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1847                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1848            };
1849
1850            // Create store with data in multiple sections
1851            {
1852                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1853                    .await
1854                    .expect("Failed to initialize store");
1855
1856                // Section 0: indices 0-4 (fill completely)
1857                for i in 0..5 {
1858                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1859                }
1860
1861                // Section 1: indices 5-9 (fill partially)
1862                store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1863                store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1864                store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1865
1866                // Section 2: indices 10-14 (fill completely)
1867                for i in 10..15 {
1868                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1869                }
1870
1871                store.close().await.unwrap();
1872            }
1873
1874            // Reinitialize with mixed bits configuration
1875            {
1876                let mut bits_map: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1877
1878                // Section 0: None option (expects all records)
1879                let none_option: Option<BitVec> = None;
1880                bits_map.insert(0, &none_option);
1881
1882                // Section 1: BitVec with specific indices
1883                let mut bitvec1 = BitVec::zeroes(5);
1884                bitvec1.set(0); // Index 5
1885                bitvec1.set(2); // Index 7
1886                                // Note: not setting bit for index 9, so it should be ignored
1887                let bitvec1_option = Some(bitvec1);
1888                bits_map.insert(1, &bitvec1_option);
1889
1890                // Section 2: Not in map, should be skipped entirely
1891
1892                let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1893                    context.clone(),
1894                    cfg.clone(),
1895                    Some(bits_map),
1896                )
1897                .await
1898                .expect("Failed to initialize store with bits");
1899
1900                // All records from section 0 should be available
1901                for i in 0..5 {
1902                    assert!(store.has(i));
1903                    assert_eq!(
1904                        store.get(i).await.unwrap().unwrap(),
1905                        FixedBytes::new([i as u8; 32])
1906                    );
1907                }
1908
1909                // Only specified records from section 1 should be available
1910                assert!(store.has(5));
1911                assert!(store.has(7));
1912                assert!(!store.has(6));
1913                assert!(!store.has(8));
1914                assert!(!store.has(9)); // Not set in bitvec
1915
1916                // No records from section 2 should be available
1917                for i in 10..15 {
1918                    assert!(!store.has(i));
1919                }
1920            }
1921        });
1922    }
1923
1924    #[test_traced]
1925    #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(2)")]
1926    fn test_init_with_bits_corrupted_records() {
1927        // Initialize the deterministic context
1928        let executor = deterministic::Runner::default();
1929        executor.start(|context| async move {
1930            let cfg = Config {
1931                partition: "test_ordinal".into(),
1932                items_per_blob: NZU64!(5),
1933                write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1934                replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1935            };
1936
1937            // Create store with data and corrupt one record
1938            {
1939                let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1940                    .await
1941                    .expect("Failed to initialize store");
1942
1943                // Section 0: indices 0-4
1944                for i in 0..5 {
1945                    store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1946                }
1947
1948                store.close().await.unwrap();
1949            }
1950
1951            // Corrupt record at index 2
1952            {
1953                let (blob, _) = context
1954                    .open("test_ordinal", &0u64.to_be_bytes())
1955                    .await
1956                    .unwrap();
1957                // Corrupt the CRC of record at index 2
1958                let offset = 2 * 36 + 32; // 2 * record_size + value_size
1959                blob.write_at(vec![0xFF], offset).await.unwrap();
1960                blob.sync().await.unwrap();
1961            }
1962
1963            // Reinitialize with bits that include the corrupted record
1964            {
1965                let mut bits_map: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1966
1967                // Create a BitVec that includes the corrupted record
1968                let mut bitvec = BitVec::zeroes(5);
1969                bitvec.set(0); // Index 0
1970                bitvec.set(2); // Index 2 (corrupted) - this will cause a panic
1971                bitvec.set(4); // Index 4
1972                let bitvec_option = Some(bitvec);
1973                bits_map.insert(0, &bitvec_option);
1974
1975                let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1976                    context.clone(),
1977                    cfg.clone(),
1978                    Some(bits_map),
1979                )
1980                .await
1981                .expect("Failed to initialize store with bits");
1982            }
1983        });
1984    }
1985}