1mod storage;
97
98use std::num::{NonZeroU64, NonZeroUsize};
99pub use storage::Ordinal;
100use thiserror::Error;
101
102#[derive(Debug, Error)]
104pub enum Error {
105 #[error("runtime error: {0}")]
106 Runtime(#[from] commonware_runtime::Error),
107 #[error("codec error: {0}")]
108 Codec(#[from] commonware_codec::Error),
109 #[error("invalid blob name: {0}")]
110 InvalidBlobName(String),
111 #[error("invalid record: {0}")]
112 InvalidRecord(u64),
113 #[error("missing record at {0}")]
114 MissingRecord(u64),
115}
116
117#[derive(Clone)]
119pub struct Config {
120 pub partition: String,
122
123 pub items_per_blob: NonZeroU64,
125
126 pub write_buffer: NonZeroUsize,
128
129 pub replay_buffer: NonZeroUsize,
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use bytes::{Buf, BufMut};
137 use commonware_codec::{FixedSize, Read, ReadExt, Write};
138 use commonware_cryptography::Crc32;
139 use commonware_macros::{test_group, test_traced};
140 use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
141 use commonware_utils::{bitmap::BitMap, hex, sequence::FixedBytes, NZUsize, NZU64};
142 use rand::RngCore;
143 use std::collections::BTreeMap;
144
145 const DEFAULT_ITEMS_PER_BLOB: u64 = 1000;
146 const DEFAULT_WRITE_BUFFER: usize = 4096;
147 const DEFAULT_REPLAY_BUFFER: usize = 1024 * 1024;
148
149 #[test_traced]
150 fn test_put_get() {
151 let executor = deterministic::Runner::default();
153 executor.start(|context| async move {
154 let cfg = Config {
156 partition: "test_ordinal".into(),
157 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
158 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
159 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
160 };
161 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
162 .await
163 .expect("Failed to initialize store");
164
165 let value = FixedBytes::new([42u8; 32]);
166
167 assert!(!store.has(0));
169
170 store
172 .put(0, value.clone())
173 .await
174 .expect("Failed to put data");
175
176 assert!(store.has(0));
178
179 let retrieved = store
181 .get(0)
182 .await
183 .expect("Failed to get data")
184 .expect("Data not found");
185 assert_eq!(retrieved, value);
186
187 store.sync().await.expect("Failed to sync data");
189
190 let buffer = context.encode();
192 assert!(buffer.contains("gets_total 1"), "{}", buffer);
193 assert!(buffer.contains("puts_total 1"), "{}", buffer);
194 assert!(buffer.contains("has_total 2"), "{}", buffer);
195 assert!(buffer.contains("syncs_total 1"), "{}", buffer);
196 assert!(buffer.contains("pruned_total 0"), "{}", buffer);
197
198 let retrieved = store
200 .get(0)
201 .await
202 .expect("Failed to get data")
203 .expect("Data not found");
204 assert_eq!(retrieved, value);
205 });
206 }
207
208 #[test_traced]
209 fn test_multiple_indices() {
210 let executor = deterministic::Runner::default();
212 executor.start(|context| async move {
213 let cfg = Config {
215 partition: "test_ordinal".into(),
216 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
217 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
218 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
219 };
220 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
221 .await
222 .expect("Failed to initialize store");
223
224 let indices = vec![
226 (0u64, FixedBytes::new([0u8; 32])),
227 (5u64, FixedBytes::new([5u8; 32])),
228 (10u64, FixedBytes::new([10u8; 32])),
229 (100u64, FixedBytes::new([100u8; 32])),
230 (1000u64, FixedBytes::new([200u8; 32])), ];
232
233 for (index, value) in &indices {
234 store
235 .put(*index, value.clone())
236 .await
237 .expect("Failed to put data");
238 }
239
240 store.sync().await.expect("Failed to sync");
242
243 for (index, value) in &indices {
245 let retrieved = store
246 .get(*index)
247 .await
248 .expect("Failed to get data")
249 .expect("Data not found");
250 assert_eq!(&retrieved, value);
251 }
252 });
253 }
254
255 #[test_traced]
256 fn test_sparse_indices() {
257 let executor = deterministic::Runner::default();
259 executor.start(|context| async move {
260 let cfg = Config {
262 partition: "test_ordinal".into(),
263 items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
265 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
266 };
267 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
268 .await
269 .expect("Failed to initialize store");
270
271 let indices = vec![
273 (0u64, FixedBytes::new([0u8; 32])),
274 (99u64, FixedBytes::new([99u8; 32])), (100u64, FixedBytes::new([100u8; 32])), (500u64, FixedBytes::new([200u8; 32])), ];
278
279 for (index, value) in &indices {
280 store
281 .put(*index, value.clone())
282 .await
283 .expect("Failed to put data");
284 }
285
286 assert!(!store.has(1));
288 assert!(!store.has(50));
289 assert!(!store.has(101));
290 assert!(!store.has(499));
291
292 store.sync().await.expect("Failed to sync");
294
295 for (index, value) in &indices {
296 let retrieved = store
297 .get(*index)
298 .await
299 .expect("Failed to get data")
300 .expect("Data not found");
301 assert_eq!(&retrieved, value);
302 }
303 });
304 }
305
306 #[test_traced]
307 fn test_next_gap() {
308 let executor = deterministic::Runner::default();
310 executor.start(|context| async move {
311 let cfg = Config {
313 partition: "test_ordinal".into(),
314 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
315 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
316 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
317 };
318 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
319 .await
320 .expect("Failed to initialize store");
321
322 store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
324 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
325 store.put(11, FixedBytes::new([11u8; 32])).await.unwrap();
326 store.put(14, FixedBytes::new([14u8; 32])).await.unwrap();
327
328 let (current_end, start_next) = store.next_gap(0);
330 assert!(current_end.is_none());
331 assert_eq!(start_next, Some(1));
332
333 let (current_end, start_next) = store.next_gap(1);
334 assert_eq!(current_end, Some(1));
335 assert_eq!(start_next, Some(10));
336
337 let (current_end, start_next) = store.next_gap(10);
338 assert_eq!(current_end, Some(11));
339 assert_eq!(start_next, Some(14));
340
341 let (current_end, start_next) = store.next_gap(11);
342 assert_eq!(current_end, Some(11));
343 assert_eq!(start_next, Some(14));
344
345 let (current_end, start_next) = store.next_gap(12);
346 assert!(current_end.is_none());
347 assert_eq!(start_next, Some(14));
348
349 let (current_end, start_next) = store.next_gap(14);
350 assert_eq!(current_end, Some(14));
351 assert!(start_next.is_none());
352 });
353 }
354
355 #[test_traced]
356 fn test_missing_items() {
357 let executor = deterministic::Runner::default();
359 executor.start(|context| async move {
360 let cfg = Config {
362 partition: "test_ordinal".into(),
363 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
364 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
365 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
366 };
367 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
368 .await
369 .expect("Failed to initialize store");
370
371 assert_eq!(store.missing_items(0, 5), Vec::<u64>::new());
373 assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
374
375 store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
377 store.put(2, FixedBytes::new([2u8; 32])).await.unwrap();
378 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
379 store.put(6, FixedBytes::new([6u8; 32])).await.unwrap();
380 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
381
382 assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
384 assert_eq!(store.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
385 assert_eq!(store.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
386
387 assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
389 assert_eq!(store.missing_items(4, 2), vec![4, 7]);
390
391 assert_eq!(store.missing_items(1, 3), vec![3, 4, 7]);
393 assert_eq!(store.missing_items(2, 4), vec![3, 4, 7, 8]);
394 assert_eq!(store.missing_items(5, 2), vec![7, 8]);
395
396 assert_eq!(store.missing_items(11, 5), Vec::<u64>::new());
398 assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
399
400 store.put(1000, FixedBytes::new([100u8; 32])).await.unwrap();
402
403 let items = store.missing_items(11, 10);
405 assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
406
407 let items = store.missing_items(990, 15);
409 assert_eq!(
410 items,
411 vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
412 );
413
414 store.sync().await.unwrap();
416 assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
417 assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
418
419 store.put(9999, FixedBytes::new([99u8; 32])).await.unwrap();
421 store
422 .put(10001, FixedBytes::new([101u8; 32]))
423 .await
424 .unwrap();
425
426 let items = store.missing_items(9998, 5);
428 assert_eq!(items, vec![9998, 10000]);
429 });
430 }
431
432 #[test_traced]
433 fn test_restart() {
434 let executor = deterministic::Runner::default();
436 executor.start(|context| async move {
437 let cfg = Config {
438 partition: "test_ordinal".into(),
439 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
440 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
441 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
442 };
443
444 {
446 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
447 .await
448 .expect("Failed to initialize store");
449
450 let values = vec![
451 (0u64, FixedBytes::new([0u8; 32])),
452 (100u64, FixedBytes::new([100u8; 32])),
453 (1000u64, FixedBytes::new([200u8; 32])),
454 ];
455
456 for (index, value) in &values {
457 store
458 .put(*index, value.clone())
459 .await
460 .expect("Failed to put data");
461 }
462
463 store.sync().await.expect("Failed to sync store");
464 }
465
466 {
468 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
469 .await
470 .expect("Failed to initialize store");
471
472 let values = vec![
473 (0u64, FixedBytes::new([0u8; 32])),
474 (100u64, FixedBytes::new([100u8; 32])),
475 (1000u64, FixedBytes::new([200u8; 32])),
476 ];
477
478 for (index, value) in &values {
479 let retrieved = store
480 .get(*index)
481 .await
482 .expect("Failed to get data")
483 .expect("Data not found");
484 assert_eq!(&retrieved, value);
485 }
486
487 let (current_end, start_next) = store.next_gap(0);
489 assert_eq!(current_end, Some(0));
490 assert_eq!(start_next, Some(100));
491 }
492 });
493 }
494
495 #[test_traced]
496 fn test_invalid_record() {
497 let executor = deterministic::Runner::default();
499 executor.start(|context| async move {
500 let cfg = Config {
501 partition: "test_ordinal".into(),
502 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
503 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
504 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
505 };
506
507 {
509 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
510 .await
511 .expect("Failed to initialize store");
512
513 store
514 .put(0, FixedBytes::new([42u8; 32]))
515 .await
516 .expect("Failed to put data");
517 store.sync().await.expect("Failed to sync store");
518 }
519
520 {
522 let (blob, _) = context
523 .open("test_ordinal", &0u64.to_be_bytes())
524 .await
525 .unwrap();
526 blob.write_at(vec![0xFF], 32).await.unwrap();
528 blob.sync().await.unwrap();
529 }
530
531 {
533 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
534 .await
535 .expect("Failed to initialize store");
536
537 let result = store.get(0).await.unwrap();
539 assert!(result.is_none());
540
541 assert!(!store.has(0));
543 }
544 });
545 }
546
547 #[test_traced]
548 fn test_get_nonexistent() {
549 let executor = deterministic::Runner::default();
551 executor.start(|context| async move {
552 let cfg = Config {
554 partition: "test_ordinal".into(),
555 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
556 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
557 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
558 };
559 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
560 .await
561 .expect("Failed to initialize store");
562
563 let retrieved = store.get(999).await.expect("Failed to get data");
565 assert!(retrieved.is_none());
566
567 assert!(!store.has(999));
569 });
570 }
571
572 #[test_traced]
573 fn test_destroy() {
574 let executor = deterministic::Runner::default();
576 executor.start(|context| async move {
577 let cfg = Config {
578 partition: "test_ordinal".into(),
579 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
580 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
581 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
582 };
583
584 {
586 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
587 .await
588 .expect("Failed to initialize store");
589
590 store
591 .put(0, FixedBytes::new([0u8; 32]))
592 .await
593 .expect("Failed to put data");
594 store
595 .put(1000, FixedBytes::new([100u8; 32]))
596 .await
597 .expect("Failed to put data");
598
599 store.destroy().await.expect("Failed to destroy store");
601 }
602
603 {
605 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
606 .await
607 .expect("Failed to initialize store");
608
609 assert!(store.get(0).await.unwrap().is_none());
611 assert!(store.get(1000).await.unwrap().is_none());
612 assert!(!store.has(0));
613 assert!(!store.has(1000));
614 }
615 });
616 }
617
618 #[test_traced]
619 fn test_partial_record_write() {
620 let executor = deterministic::Runner::default();
622 executor.start(|context| async move {
623 let cfg = Config {
624 partition: "test_ordinal".into(),
625 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
626 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
627 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
628 };
629
630 {
632 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
633 .await
634 .expect("Failed to initialize store");
635
636 store
637 .put(0, FixedBytes::new([42u8; 32]))
638 .await
639 .expect("Failed to put data");
640 store
641 .put(1, FixedBytes::new([43u8; 32]))
642 .await
643 .expect("Failed to put data");
644 store.sync().await.expect("Failed to sync store");
645 }
646
647 {
649 let (blob, _) = context
650 .open("test_ordinal", &0u64.to_be_bytes())
651 .await
652 .unwrap();
653 blob.write_at(vec![0xFF; 32], 36).await.unwrap();
655 blob.sync().await.unwrap();
656 }
657
658 {
660 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
661 .await
662 .expect("Failed to initialize store");
663
664 assert_eq!(
666 store.get(0).await.unwrap().unwrap(),
667 FixedBytes::new([42u8; 32])
668 );
669
670 assert!(!store.has(1));
672 assert!(store.get(1).await.unwrap().is_none());
673
674 let mut store_mut = store;
676 store_mut.put(1, FixedBytes::new([44u8; 32])).await.unwrap();
677 assert_eq!(
678 store_mut.get(1).await.unwrap().unwrap(),
679 FixedBytes::new([44u8; 32])
680 );
681 }
682 });
683 }
684
685 #[test_traced]
686 fn test_corrupted_value() {
687 let executor = deterministic::Runner::default();
689 executor.start(|context| async move {
690 let cfg = Config {
691 partition: "test_ordinal".into(),
692 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
693 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
694 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
695 };
696
697 {
699 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
700 .await
701 .expect("Failed to initialize store");
702
703 store
704 .put(0, FixedBytes::new([42u8; 32]))
705 .await
706 .expect("Failed to put data");
707 store
708 .put(1, FixedBytes::new([43u8; 32]))
709 .await
710 .expect("Failed to put data");
711 store.sync().await.expect("Failed to sync store");
712 }
713
714 {
716 let (blob, _) = context
717 .open("test_ordinal", &0u64.to_be_bytes())
718 .await
719 .unwrap();
720 blob.write_at(hex!("0xFFFFFFFF").to_vec(), 10)
722 .await
723 .unwrap();
724 blob.sync().await.unwrap();
725 }
726
727 {
729 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
730 .await
731 .expect("Failed to initialize store");
732
733 assert!(!store.has(0));
735
736 assert!(store.has(1));
738 assert_eq!(
739 store.get(1).await.unwrap().unwrap(),
740 FixedBytes::new([43u8; 32])
741 );
742 }
743 });
744 }
745
746 #[test_traced]
747 fn test_crc_corruptions() {
748 let executor = deterministic::Runner::default();
750 executor.start(|context| async move {
751 let cfg = Config {
752 partition: "test_ordinal".into(),
753 items_per_blob: NZU64!(10), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
755 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
756 };
757
758 {
760 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
761 .await
762 .expect("Failed to initialize store");
763
764 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
766 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
767 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
768 store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
769 store.sync().await.expect("Failed to sync store");
770 }
771
772 {
774 let (blob, _) = context
776 .open("test_ordinal", &0u64.to_be_bytes())
777 .await
778 .unwrap();
779 blob.write_at(vec![0xFF], 32).await.unwrap(); blob.sync().await.unwrap();
781
782 let (blob, _) = context
784 .open("test_ordinal", &1u64.to_be_bytes())
785 .await
786 .unwrap();
787 blob.write_at(vec![0xFF; 4], 5).await.unwrap(); blob.sync().await.unwrap();
789 }
790
791 {
793 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
794 .await
795 .expect("Failed to initialize store");
796
797 assert!(!store.has(0)); assert!(!store.has(10)); assert!(store.has(5));
803 assert!(store.has(15));
804 assert_eq!(
805 store.get(5).await.unwrap().unwrap(),
806 FixedBytes::new([5u8; 32])
807 );
808 assert_eq!(
809 store.get(15).await.unwrap().unwrap(),
810 FixedBytes::new([15u8; 32])
811 );
812 }
813 });
814 }
815
816 #[test_traced]
817 fn test_extra_bytes_in_blob() {
818 let executor = deterministic::Runner::default();
820 executor.start(|context| async move {
821 let cfg = Config {
822 partition: "test_ordinal".into(),
823 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
824 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
825 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
826 };
827
828 {
830 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
831 .await
832 .expect("Failed to initialize store");
833
834 store
835 .put(0, FixedBytes::new([42u8; 32]))
836 .await
837 .expect("Failed to put data");
838 store
839 .put(1, FixedBytes::new([43u8; 32]))
840 .await
841 .expect("Failed to put data");
842 store.sync().await.expect("Failed to sync store");
843 }
844
845 {
847 let (blob, size) = context
848 .open("test_ordinal", &0u64.to_be_bytes())
849 .await
850 .unwrap();
851 let mut garbage = vec![0xFF; 32]; let invalid_crc = 0xDEADBEEFu32;
855 garbage.extend_from_slice(&invalid_crc.to_be_bytes());
856 assert_eq!(garbage.len(), 36); blob.write_at(garbage, size).await.unwrap();
858 blob.sync().await.unwrap();
859 }
860
861 {
863 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
864 .await
865 .expect("Failed to initialize store");
866
867 assert!(store.has(0));
869 assert!(store.has(1));
870 assert_eq!(
871 store.get(0).await.unwrap().unwrap(),
872 FixedBytes::new([42u8; 32])
873 );
874 assert_eq!(
875 store.get(1).await.unwrap().unwrap(),
876 FixedBytes::new([43u8; 32])
877 );
878
879 let mut store_mut = store;
881 store_mut.put(2, FixedBytes::new([44u8; 32])).await.unwrap();
882 assert_eq!(
883 store_mut.get(2).await.unwrap().unwrap(),
884 FixedBytes::new([44u8; 32])
885 );
886 }
887 });
888 }
889
890 #[test_traced]
891 fn test_zero_filled_records() {
892 let executor = deterministic::Runner::default();
894 executor.start(|context| async move {
895 let cfg = Config {
896 partition: "test_ordinal".into(),
897 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
898 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
899 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
900 };
901
902 {
904 let (blob, _) = context
905 .open("test_ordinal", &0u64.to_be_bytes())
906 .await
907 .unwrap();
908
909 let zeros = vec![0u8; 36 * 5]; blob.write_at(zeros, 0).await.unwrap();
912
913 let mut valid_record = vec![44u8; 32];
915 let crc = Crc32::checksum(&valid_record);
916 valid_record.extend_from_slice(&crc.to_be_bytes());
917 blob.write_at(valid_record, 36 * 5).await.unwrap();
918
919 blob.sync().await.unwrap();
920 }
921
922 {
924 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
925 .await
926 .expect("Failed to initialize store");
927
928 for i in 0..5 {
930 assert!(!store.has(i));
931 }
932
933 assert!(store.has(5));
935 assert_eq!(
936 store.get(5).await.unwrap().unwrap(),
937 FixedBytes::new([44u8; 32])
938 );
939 }
940 });
941 }
942
943 fn test_operations_and_restart(num_values: usize) -> String {
944 let executor = deterministic::Runner::default();
946 executor.start(|mut context| async move {
947 let cfg = Config {
948 partition: "test_ordinal".into(),
949 items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
951 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
952 };
953
954 let mut store = Ordinal::<_, FixedBytes<128>>::init(context.clone(), cfg.clone())
956 .await
957 .expect("Failed to initialize store");
958
959 let mut values = Vec::new();
961 let mut rng_index = 0u64;
962
963 for _ in 0..num_values {
964 let mut index_bytes = [0u8; 8];
966 context.fill_bytes(&mut index_bytes);
967 let index_offset = u64::from_be_bytes(index_bytes) % 1000;
968 let index = rng_index + index_offset;
969 rng_index = index + 1;
970
971 let mut value = [0u8; 128];
973 context.fill_bytes(&mut value);
974 let value = FixedBytes::<128>::new(value);
975
976 store
977 .put(index, value.clone())
978 .await
979 .expect("Failed to put data");
980 values.push((index, value));
981 }
982
983 store.sync().await.expect("Failed to sync");
985
986 for (index, value) in &values {
988 let retrieved = store
989 .get(*index)
990 .await
991 .expect("Failed to get data")
992 .expect("Data not found");
993 assert_eq!(&retrieved, value);
994 }
995
996 for i in 0..10 {
998 let _ = store.next_gap(i * 100);
999 }
1000
1001 store.sync().await.expect("Failed to sync store");
1003 drop(store);
1004
1005 let mut store = Ordinal::<_, FixedBytes<128>>::init(context.clone(), cfg)
1007 .await
1008 .expect("Failed to initialize store");
1009
1010 for (index, value) in &values {
1012 let retrieved = store
1013 .get(*index)
1014 .await
1015 .expect("Failed to get data")
1016 .expect("Data not found");
1017 assert_eq!(&retrieved, value);
1018 }
1019
1020 for _ in 0..10 {
1022 let mut index_bytes = [0u8; 8];
1023 context.fill_bytes(&mut index_bytes);
1024 let index = u64::from_be_bytes(index_bytes) % 10000;
1025
1026 let mut value = [0u8; 128];
1027 context.fill_bytes(&mut value);
1028 let value = FixedBytes::<128>::new(value);
1029
1030 store.put(index, value).await.expect("Failed to put data");
1031 }
1032
1033 store.sync().await.expect("Failed to sync");
1035
1036 context.auditor().state()
1038 })
1039 }
1040
1041 #[test_group("slow")]
1042 #[test_traced]
1043 fn test_determinism() {
1044 let state1 = test_operations_and_restart(100);
1045 let state2 = test_operations_and_restart(100);
1046 assert_eq!(state1, state2);
1047 }
1048
1049 #[test_traced]
1050 fn test_prune_basic() {
1051 let executor = deterministic::Runner::default();
1053 executor.start(|context| async move {
1054 let cfg = Config {
1055 partition: "test_ordinal".into(),
1056 items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1058 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1059 };
1060
1061 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1062 .await
1063 .expect("Failed to initialize store");
1064
1065 let values = vec![
1067 (0u64, FixedBytes::new([0u8; 32])), (50u64, FixedBytes::new([50u8; 32])), (100u64, FixedBytes::new([100u8; 32])), (150u64, FixedBytes::new([150u8; 32])), (200u64, FixedBytes::new([200u8; 32])), (300u64, FixedBytes::new([44u8; 32])), ];
1074
1075 for (index, value) in &values {
1076 store
1077 .put(*index, value.clone())
1078 .await
1079 .expect("Failed to put data");
1080 }
1081 store.sync().await.unwrap();
1082
1083 for (index, value) in &values {
1085 assert_eq!(store.get(*index).await.unwrap().unwrap(), *value);
1086 }
1087
1088 store.prune(150).await.unwrap();
1090 let buffer = context.encode();
1091 assert!(buffer.contains("pruned_total 1"));
1092
1093 assert!(!store.has(0));
1095 assert!(!store.has(50));
1096 assert!(store.get(0).await.unwrap().is_none());
1097 assert!(store.get(50).await.unwrap().is_none());
1098
1099 assert!(store.has(100));
1101 assert!(store.has(150));
1102 assert!(store.has(200));
1103 assert!(store.has(300));
1104 assert_eq!(store.get(100).await.unwrap().unwrap(), values[2].1);
1105 assert_eq!(store.get(150).await.unwrap().unwrap(), values[3].1);
1106 assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1107 assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1108
1109 store.prune(250).await.unwrap();
1111 let buffer = context.encode();
1112 assert!(buffer.contains("pruned_total 2"));
1113
1114 assert!(!store.has(100));
1116 assert!(!store.has(150));
1117 assert!(store.get(100).await.unwrap().is_none());
1118 assert!(store.get(150).await.unwrap().is_none());
1119
1120 assert!(store.has(200));
1122 assert!(store.has(300));
1123 assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1124 assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1125 });
1126 }
1127
1128 #[test_traced]
1129 fn test_prune_with_gaps() {
1130 let executor = deterministic::Runner::default();
1132 executor.start(|context| async move {
1133 let cfg = Config {
1134 partition: "test_ordinal".into(),
1135 items_per_blob: NZU64!(100),
1136 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1137 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1138 };
1139
1140 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1141 .await
1142 .expect("Failed to initialize store");
1143
1144 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1146 store.put(105, FixedBytes::new([105u8; 32])).await.unwrap();
1147 store.put(305, FixedBytes::new([49u8; 32])).await.unwrap();
1148 store.sync().await.unwrap();
1149
1150 let (current_end, next_start) = store.next_gap(0);
1152 assert!(current_end.is_none());
1153 assert_eq!(next_start, Some(5));
1154
1155 let (current_end, next_start) = store.next_gap(5);
1156 assert_eq!(current_end, Some(5));
1157 assert_eq!(next_start, Some(105));
1158
1159 store.prune(150).await.unwrap();
1161
1162 assert!(!store.has(5));
1164 assert!(store.get(5).await.unwrap().is_none());
1165
1166 assert!(store.has(105));
1168 assert!(store.has(305));
1169
1170 let (current_end, next_start) = store.next_gap(0);
1171 assert!(current_end.is_none());
1172 assert_eq!(next_start, Some(105));
1173
1174 let (current_end, next_start) = store.next_gap(105);
1175 assert_eq!(current_end, Some(105));
1176 assert_eq!(next_start, Some(305));
1177 });
1178 }
1179
1180 #[test_traced]
1181 fn test_prune_no_op() {
1182 let executor = deterministic::Runner::default();
1184 executor.start(|context| async move {
1185 let cfg = Config {
1186 partition: "test_ordinal".into(),
1187 items_per_blob: NZU64!(100),
1188 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1189 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1190 };
1191
1192 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1193 .await
1194 .expect("Failed to initialize store");
1195
1196 store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1198 store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1199 store.sync().await.unwrap();
1200
1201 store.prune(50).await.unwrap();
1203
1204 assert!(store.has(100));
1206 assert!(store.has(200));
1207 let buffer = context.encode();
1208 assert!(buffer.contains("pruned_total 0"));
1209
1210 store.prune(100).await.unwrap();
1212
1213 assert!(store.has(100));
1215 assert!(store.has(200));
1216 let buffer = context.encode();
1217 assert!(buffer.contains("pruned_total 0"));
1218 });
1219 }
1220
1221 #[test_traced]
1222 fn test_prune_empty_store() {
1223 let executor = deterministic::Runner::default();
1225 executor.start(|context| async move {
1226 let cfg = Config {
1227 partition: "test_ordinal".into(),
1228 items_per_blob: NZU64!(100),
1229 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1230 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1231 };
1232
1233 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1234 .await
1235 .expect("Failed to initialize store");
1236
1237 store.prune(1000).await.unwrap();
1239
1240 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1242 assert!(store.has(0));
1243 });
1244 }
1245
1246 #[test_traced]
1247 fn test_prune_after_restart() {
1248 let executor = deterministic::Runner::default();
1250 executor.start(|context| async move {
1251 let cfg = Config {
1252 partition: "test_ordinal".into(),
1253 items_per_blob: NZU64!(100),
1254 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1255 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1256 };
1257
1258 {
1260 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1261 .await
1262 .expect("Failed to initialize store");
1263
1264 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1265 store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1266 store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1267 store.sync().await.unwrap();
1268 }
1269
1270 {
1272 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1273 .await
1274 .expect("Failed to initialize store");
1275
1276 assert!(store.has(0));
1278 assert!(store.has(100));
1279 assert!(store.has(200));
1280
1281 store.prune(150).await.unwrap();
1283
1284 assert!(!store.has(0));
1286 assert!(store.has(100));
1287 assert!(store.has(200));
1288
1289 store.sync().await.unwrap();
1290 }
1291
1292 {
1294 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1295 .await
1296 .expect("Failed to initialize store");
1297
1298 assert!(!store.has(0));
1299 assert!(store.has(100));
1300 assert!(store.has(200));
1301
1302 let (current_end, next_start) = store.next_gap(0);
1304 assert!(current_end.is_none());
1305 assert_eq!(next_start, Some(100));
1306 }
1307 });
1308 }
1309
1310 #[test_traced]
1311 fn test_prune_multiple_operations() {
1312 let executor = deterministic::Runner::default();
1314 executor.start(|context| async move {
1315 let cfg = Config {
1316 partition: "test_ordinal".into(),
1317 items_per_blob: NZU64!(50), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1319 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1320 };
1321
1322 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1323 .await
1324 .expect("Failed to initialize store");
1325
1326 let mut values = Vec::new();
1328 for i in 0..10 {
1329 let index = i * 50 + 25; let value = FixedBytes::new([i as u8; 32]);
1331 store.put(index, value.clone()).await.unwrap();
1332 values.push((index, value));
1333 }
1334 store.sync().await.unwrap();
1335
1336 for i in 1..5 {
1338 let prune_index = i * 50 + 10;
1339 store.prune(prune_index).await.unwrap();
1340
1341 for (index, _) in &values {
1343 if *index < prune_index {
1344 assert!(!store.has(*index), "Index {index} should be pruned");
1345 } else {
1346 assert!(store.has(*index), "Index {index} should not be pruned");
1347 }
1348 }
1349 }
1350
1351 let buffer = context.encode();
1353 assert!(buffer.contains("pruned_total 4"));
1354
1355 for i in 4..10 {
1357 let index = i * 50 + 25;
1358 assert!(store.has(index));
1359 assert_eq!(
1360 store.get(index).await.unwrap().unwrap(),
1361 values[i as usize].1
1362 );
1363 }
1364 });
1365 }
1366
1367 #[test_traced]
1368 fn test_prune_blob_boundaries() {
1369 let executor = deterministic::Runner::default();
1371 executor.start(|context| async move {
1372 let cfg = Config {
1373 partition: "test_ordinal".into(),
1374 items_per_blob: NZU64!(100),
1375 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1376 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1377 };
1378
1379 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1380 .await
1381 .expect("Failed to initialize store");
1382
1383 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); store.put(99, FixedBytes::new([99u8; 32])).await.unwrap(); store.put(100, FixedBytes::new([100u8; 32])).await.unwrap(); store.put(199, FixedBytes::new([199u8; 32])).await.unwrap(); store.put(200, FixedBytes::new([200u8; 32])).await.unwrap(); store.sync().await.unwrap();
1390
1391 store.prune(100).await.unwrap();
1395 assert!(!store.has(0));
1396 assert!(!store.has(99));
1397 assert!(store.has(100));
1398 assert!(store.has(199));
1399 assert!(store.has(200));
1400
1401 store.prune(199).await.unwrap();
1403 assert!(store.has(100));
1404 assert!(store.has(199));
1405 assert!(store.has(200));
1406
1407 store.prune(200).await.unwrap();
1409 assert!(!store.has(100));
1410 assert!(!store.has(199));
1411 assert!(store.has(200));
1412
1413 let buffer = context.encode();
1414 assert!(buffer.contains("pruned_total 2"));
1415 });
1416 }
1417
1418 #[test_traced]
1419 fn test_prune_non_contiguous_sections() {
1420 let executor = deterministic::Runner::default();
1422 executor.start(|context| async move {
1423 let cfg = Config {
1424 partition: "test_ordinal".into(),
1425 items_per_blob: NZU64!(100),
1426 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1427 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1428 };
1429
1430 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1431 .await
1432 .expect("Failed to initialize store");
1433
1434 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); store.put(250, FixedBytes::new([50u8; 32])).await.unwrap(); store.put(500, FixedBytes::new([44u8; 32])).await.unwrap(); store.put(750, FixedBytes::new([45u8; 32])).await.unwrap(); store.sync().await.unwrap();
1440
1441 assert!(store.has(0));
1443 assert!(store.has(250));
1444 assert!(store.has(500));
1445 assert!(store.has(750));
1446
1447 store.prune(300).await.unwrap();
1449
1450 assert!(!store.has(0)); assert!(!store.has(250)); assert!(store.has(500)); assert!(store.has(750)); let buffer = context.encode();
1457 assert!(buffer.contains("pruned_total 2"));
1458
1459 store.prune(600).await.unwrap();
1461
1462 assert!(!store.has(500)); assert!(store.has(750)); let buffer = context.encode();
1467 assert!(buffer.contains("pruned_total 3"));
1468
1469 store.prune(1000).await.unwrap();
1471
1472 assert!(!store.has(750)); let buffer = context.encode();
1476 assert!(buffer.contains("pruned_total 4"));
1477 });
1478 }
1479
1480 #[test_traced]
1481 fn test_prune_removes_correct_pending() {
1482 let executor = deterministic::Runner::default();
1484 executor.start(|context| async move {
1485 let cfg = Config {
1486 partition: "test_ordinal".into(),
1487 items_per_blob: NZU64!(100),
1488 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1489 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1490 };
1491 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1492 .await
1493 .expect("Failed to initialize store");
1494
1495 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1497 store.sync().await.unwrap();
1498
1499 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap(); store.put(110, FixedBytes::new([110u8; 32])).await.unwrap(); assert!(store.has(5));
1505 assert!(store.has(10));
1506 assert!(store.has(110));
1507
1508 store.prune(150).await.unwrap();
1510
1511 assert!(!store.has(5));
1513 assert!(!store.has(10));
1514
1515 assert!(store.has(110));
1517 assert_eq!(
1518 store.get(110).await.unwrap().unwrap(),
1519 FixedBytes::new([110u8; 32])
1520 );
1521
1522 store.sync().await.unwrap();
1524 assert!(store.has(110));
1525 assert_eq!(
1526 store.get(110).await.unwrap().unwrap(),
1527 FixedBytes::new([110u8; 32])
1528 );
1529 });
1530 }
1531
1532 #[test_traced]
1533 fn test_init_with_bits_none() {
1534 let executor = deterministic::Runner::default();
1536 executor.start(|context| async move {
1537 let cfg = Config {
1538 partition: "test_ordinal".into(),
1539 items_per_blob: NZU64!(10), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1541 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1542 };
1543
1544 {
1546 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1547 .await
1548 .expect("Failed to initialize store");
1549
1550 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1552 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1553 store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1554
1555 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1557 store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
1558
1559 store.put(25, FixedBytes::new([25u8; 32])).await.unwrap();
1561
1562 store.sync().await.unwrap();
1563 }
1564
1565 {
1567 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1568 context.clone(),
1569 cfg.clone(),
1570 None,
1571 )
1572 .await
1573 .expect("Failed to initialize store with bits");
1574
1575 assert!(store.has(0));
1577 assert!(store.has(5));
1578 assert!(store.has(9));
1579 assert!(store.has(10));
1580 assert!(store.has(15));
1581 assert!(store.has(25));
1582
1583 assert!(!store.has(1));
1585 assert!(!store.has(11));
1586 assert!(!store.has(20));
1587
1588 assert_eq!(
1590 store.get(0).await.unwrap().unwrap(),
1591 FixedBytes::new([0u8; 32])
1592 );
1593 assert_eq!(
1594 store.get(15).await.unwrap().unwrap(),
1595 FixedBytes::new([15u8; 32])
1596 );
1597 }
1598 });
1599 }
1600
1601 #[test_traced]
1602 fn test_init_with_bits_empty_hashmap() {
1603 let executor = deterministic::Runner::default();
1605 executor.start(|context| async move {
1606 let cfg = Config {
1607 partition: "test_ordinal".into(),
1608 items_per_blob: NZU64!(10),
1609 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1610 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1611 };
1612
1613 {
1615 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1616 .await
1617 .expect("Failed to initialize store");
1618
1619 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1620 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1621 store.put(20, FixedBytes::new([20u8; 32])).await.unwrap();
1622
1623 store.sync().await.unwrap();
1624 }
1625
1626 {
1628 let bits: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1629 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1630 context.clone(),
1631 cfg.clone(),
1632 Some(bits),
1633 )
1634 .await
1635 .expect("Failed to initialize store with bits");
1636
1637 assert!(!store.has(0));
1639 assert!(!store.has(10));
1640 assert!(!store.has(20));
1641 }
1642 });
1643 }
1644
1645 #[test_traced]
1646 fn test_init_with_bits_selective_sections() {
1647 let executor = deterministic::Runner::default();
1649 executor.start(|context| async move {
1650 let cfg = Config {
1651 partition: "test_ordinal".into(),
1652 items_per_blob: NZU64!(10),
1653 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1654 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1655 };
1656
1657 {
1659 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1660 .await
1661 .expect("Failed to initialize store");
1662
1663 for i in 0..10 {
1665 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1666 }
1667
1668 for i in 10..20 {
1670 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1671 }
1672
1673 for i in 20..30 {
1675 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1676 }
1677
1678 store.sync().await.unwrap();
1679 }
1680
1681 {
1683 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1684
1685 let mut bitmap = BitMap::zeroes(10);
1687 bitmap.set(2, true); bitmap.set(5, true); bitmap.set(8, true); let bitmap_option = Some(bitmap);
1691
1692 bits_map.insert(1, &bitmap_option);
1693
1694 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1695 context.clone(),
1696 cfg.clone(),
1697 Some(bits_map),
1698 )
1699 .await
1700 .expect("Failed to initialize store with bits");
1701
1702 assert!(store.has(12));
1704 assert!(store.has(15));
1705 assert!(store.has(18));
1706
1707 assert!(!store.has(10));
1709 assert!(!store.has(11));
1710 assert!(!store.has(13));
1711 assert!(!store.has(14));
1712 assert!(!store.has(16));
1713 assert!(!store.has(17));
1714 assert!(!store.has(19));
1715
1716 for i in 0..10 {
1718 assert!(!store.has(i));
1719 }
1720 for i in 20..30 {
1721 assert!(!store.has(i));
1722 }
1723
1724 assert_eq!(
1726 store.get(12).await.unwrap().unwrap(),
1727 FixedBytes::new([12u8; 32])
1728 );
1729 assert_eq!(
1730 store.get(15).await.unwrap().unwrap(),
1731 FixedBytes::new([15u8; 32])
1732 );
1733 assert_eq!(
1734 store.get(18).await.unwrap().unwrap(),
1735 FixedBytes::new([18u8; 32])
1736 );
1737 }
1738 });
1739 }
1740
1741 #[test_traced]
1742 fn test_init_with_bits_none_option_all_records_exist() {
1743 let executor = deterministic::Runner::default();
1745 executor.start(|context| async move {
1746 let cfg = Config {
1747 partition: "test_ordinal".into(),
1748 items_per_blob: NZU64!(5),
1749 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1750 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1751 };
1752
1753 {
1755 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1756 .await
1757 .expect("Failed to initialize store");
1758
1759 for i in 5..10 {
1761 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1762 }
1763
1764 store.sync().await.unwrap();
1765 }
1766
1767 {
1769 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1770 let none_option: Option<BitMap> = None;
1771 bits_map.insert(1, &none_option);
1772
1773 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1774 context.clone(),
1775 cfg.clone(),
1776 Some(bits_map),
1777 )
1778 .await
1779 .expect("Failed to initialize store with bits");
1780
1781 for i in 5..10 {
1783 assert!(store.has(i));
1784 assert_eq!(
1785 store.get(i).await.unwrap().unwrap(),
1786 FixedBytes::new([i as u8; 32])
1787 );
1788 }
1789 }
1790 });
1791 }
1792
1793 #[test_traced]
1794 #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(6)")]
1795 fn test_init_with_bits_none_option_missing_record_panics() {
1796 let executor = deterministic::Runner::default();
1798 executor.start(|context| async move {
1799 let cfg = Config {
1800 partition: "test_ordinal".into(),
1801 items_per_blob: NZU64!(5),
1802 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1803 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1804 };
1805
1806 {
1808 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1809 .await
1810 .expect("Failed to initialize store");
1811
1812 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1814 store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1816 store.put(8, FixedBytes::new([8u8; 32])).await.unwrap();
1817 store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1818
1819 store.sync().await.unwrap();
1820 }
1821
1822 {
1825 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1826 let none_option: Option<BitMap> = None;
1827 bits_map.insert(1, &none_option);
1828
1829 let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1830 context.clone(),
1831 cfg.clone(),
1832 Some(bits_map),
1833 )
1834 .await
1835 .expect("Failed to initialize store with bits");
1836 }
1837 });
1838 }
1839
1840 #[test_traced]
1841 fn test_init_with_bits_mixed_sections() {
1842 let executor = deterministic::Runner::default();
1844 executor.start(|context| async move {
1845 let cfg = Config {
1846 partition: "test_ordinal".into(),
1847 items_per_blob: NZU64!(5),
1848 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1849 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1850 };
1851
1852 {
1854 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1855 .await
1856 .expect("Failed to initialize store");
1857
1858 for i in 0..5 {
1860 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1861 }
1862
1863 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1865 store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1866 store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1867
1868 for i in 10..15 {
1870 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1871 }
1872
1873 store.sync().await.unwrap();
1874 }
1875
1876 {
1878 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1879
1880 let none_option: Option<BitMap> = None;
1882 bits_map.insert(0, &none_option);
1883
1884 let mut bitmap1 = BitMap::zeroes(5);
1886 bitmap1.set(0, true); bitmap1.set(2, true); let bitmap1_option = Some(bitmap1);
1890 bits_map.insert(1, &bitmap1_option);
1891
1892 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1895 context.clone(),
1896 cfg.clone(),
1897 Some(bits_map),
1898 )
1899 .await
1900 .expect("Failed to initialize store with bits");
1901
1902 for i in 0..5 {
1904 assert!(store.has(i));
1905 assert_eq!(
1906 store.get(i).await.unwrap().unwrap(),
1907 FixedBytes::new([i as u8; 32])
1908 );
1909 }
1910
1911 assert!(store.has(5));
1913 assert!(store.has(7));
1914 assert!(!store.has(6));
1915 assert!(!store.has(8));
1916 assert!(!store.has(9)); for i in 10..15 {
1920 assert!(!store.has(i));
1921 }
1922 }
1923 });
1924 }
1925
1926 #[test_traced]
1927 #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(2)")]
1928 fn test_init_with_bits_corrupted_records() {
1929 let executor = deterministic::Runner::default();
1931 executor.start(|context| async move {
1932 let cfg = Config {
1933 partition: "test_ordinal".into(),
1934 items_per_blob: NZU64!(5),
1935 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1936 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1937 };
1938
1939 {
1941 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1942 .await
1943 .expect("Failed to initialize store");
1944
1945 for i in 0..5 {
1947 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1948 }
1949
1950 store.sync().await.unwrap();
1951 }
1952
1953 {
1955 let (blob, _) = context
1956 .open("test_ordinal", &0u64.to_be_bytes())
1957 .await
1958 .unwrap();
1959 let offset = 2 * 36 + 32; blob.write_at(vec![0xFF], offset).await.unwrap();
1962 blob.sync().await.unwrap();
1963 }
1964
1965 {
1967 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1968
1969 let mut bitmap = BitMap::zeroes(5);
1971 bitmap.set(0, true); bitmap.set(2, true); bitmap.set(4, true); let bitmap_option = Some(bitmap);
1975 bits_map.insert(0, &bitmap_option);
1976
1977 let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1978 context.clone(),
1979 cfg.clone(),
1980 Some(bits_map),
1981 )
1982 .await
1983 .expect("Failed to initialize store with bits");
1984 }
1985 });
1986 }
1987
1988 #[derive(Debug, PartialEq, Eq)]
1990 pub struct DummyValue {
1991 pub value: u64,
1992 }
1993
1994 impl Write for DummyValue {
1995 fn write(&self, buf: &mut impl BufMut) {
1996 self.value.write(buf);
1997 }
1998 }
1999
2000 impl Read for DummyValue {
2001 type Cfg = ();
2002
2003 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
2004 let value = u64::read(buf)?;
2005 if value == 0 {
2006 return Err(commonware_codec::Error::Invalid(
2007 "DummyValue",
2008 "value must be non-zero",
2009 ));
2010 }
2011 Ok(Self { value })
2012 }
2013 }
2014
2015 impl FixedSize for DummyValue {
2016 const SIZE: usize = u64::SIZE;
2017 }
2018
2019 #[test_traced]
2020 fn test_init_skip_unparseable_record() {
2021 let executor = deterministic::Runner::default();
2023 executor.start(|context| async move {
2024 let cfg = Config {
2025 partition: "test_ordinal".into(),
2026 items_per_blob: NZU64!(1),
2027 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
2028 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
2029 };
2030
2031 {
2033 let mut store = Ordinal::<_, DummyValue>::init(context.clone(), cfg.clone())
2034 .await
2035 .expect("Failed to initialize store");
2036
2037 store.put(1, DummyValue { value: 1 }).await.unwrap();
2039 store.put(2, DummyValue { value: 0 }).await.unwrap(); store.put(4, DummyValue { value: 4 }).await.unwrap();
2041
2042 store.sync().await.unwrap();
2043 }
2044
2045 {
2047 let store = Ordinal::<_, DummyValue>::init(context.clone(), cfg.clone())
2048 .await
2049 .expect("Failed to initialize store");
2050
2051 assert!(store.has(1), "Record 1 should be available");
2053 assert_eq!(
2054 store.get(1).await.unwrap().unwrap(),
2055 DummyValue { value: 1 },
2056 "Record 0 should have correct value"
2057 );
2058
2059 assert!(
2061 !store.has(2),
2062 "Record 2 should not be available (unparseable)"
2063 );
2064
2065 assert!(
2067 store.has(4),
2068 "Record 4 should be available - we should not exit early on unparseable record"
2069 );
2070 assert_eq!(
2071 store.get(4).await.unwrap().unwrap(),
2072 DummyValue { value: 4 },
2073 "Record 4 should have correct value"
2074 );
2075 }
2076 });
2077 }
2078}