1mod storage;
97
98use std::num::{NonZeroU64, NonZeroUsize};
99pub use storage::Ordinal;
100use thiserror::Error;
101
102#[derive(Debug, Error)]
104pub enum Error {
105 #[error("runtime error: {0}")]
106 Runtime(#[from] commonware_runtime::Error),
107 #[error("codec error: {0}")]
108 Codec(#[from] commonware_codec::Error),
109 #[error("invalid blob name: {0}")]
110 InvalidBlobName(String),
111 #[error("invalid record: {0}")]
112 InvalidRecord(u64),
113 #[error("missing record at {0}")]
114 MissingRecord(u64),
115}
116
117#[derive(Clone)]
119pub struct Config {
120 pub partition: String,
122
123 pub items_per_blob: NonZeroU64,
125
126 pub write_buffer: NonZeroUsize,
128
129 pub replay_buffer: NonZeroUsize,
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use commonware_codec::{FixedSize, Read, ReadExt, Write};
137 use commonware_cryptography::Crc32;
138 use commonware_macros::{test_group, test_traced};
139 use commonware_runtime::{deterministic, Blob, Buf, BufMut, Metrics, Runner, Storage};
140 use commonware_utils::{bitmap::BitMap, hex, sequence::FixedBytes, NZUsize, NZU64};
141 use rand::RngCore;
142 use std::collections::BTreeMap;
143
144 const DEFAULT_ITEMS_PER_BLOB: u64 = 1000;
145 const DEFAULT_WRITE_BUFFER: usize = 4096;
146 const DEFAULT_REPLAY_BUFFER: usize = 1024 * 1024;
147
148 #[test_traced]
149 fn test_put_get() {
150 let executor = deterministic::Runner::default();
152 executor.start(|context| async move {
153 let cfg = Config {
155 partition: "test_ordinal".into(),
156 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
157 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
158 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
159 };
160 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
161 .await
162 .expect("Failed to initialize store");
163
164 let value = FixedBytes::new([42u8; 32]);
165
166 assert!(!store.has(0));
168
169 store
171 .put(0, value.clone())
172 .await
173 .expect("Failed to put data");
174
175 assert!(store.has(0));
177
178 let retrieved = store
180 .get(0)
181 .await
182 .expect("Failed to get data")
183 .expect("Data not found");
184 assert_eq!(retrieved, value);
185
186 store.sync().await.expect("Failed to sync data");
188
189 let buffer = context.encode();
191 assert!(buffer.contains("gets_total 1"), "{}", buffer);
192 assert!(buffer.contains("puts_total 1"), "{}", buffer);
193 assert!(buffer.contains("has_total 2"), "{}", buffer);
194 assert!(buffer.contains("syncs_total 1"), "{}", buffer);
195 assert!(buffer.contains("pruned_total 0"), "{}", buffer);
196
197 let retrieved = store
199 .get(0)
200 .await
201 .expect("Failed to get data")
202 .expect("Data not found");
203 assert_eq!(retrieved, value);
204 });
205 }
206
207 #[test_traced]
208 fn test_multiple_indices() {
209 let executor = deterministic::Runner::default();
211 executor.start(|context| async move {
212 let cfg = Config {
214 partition: "test_ordinal".into(),
215 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
216 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
217 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
218 };
219 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
220 .await
221 .expect("Failed to initialize store");
222
223 let indices = vec![
225 (0u64, FixedBytes::new([0u8; 32])),
226 (5u64, FixedBytes::new([5u8; 32])),
227 (10u64, FixedBytes::new([10u8; 32])),
228 (100u64, FixedBytes::new([100u8; 32])),
229 (1000u64, FixedBytes::new([200u8; 32])), ];
231
232 for (index, value) in &indices {
233 store
234 .put(*index, value.clone())
235 .await
236 .expect("Failed to put data");
237 }
238
239 store.sync().await.expect("Failed to sync");
241
242 for (index, value) in &indices {
244 let retrieved = store
245 .get(*index)
246 .await
247 .expect("Failed to get data")
248 .expect("Data not found");
249 assert_eq!(&retrieved, value);
250 }
251 });
252 }
253
254 #[test_traced]
255 fn test_sparse_indices() {
256 let executor = deterministic::Runner::default();
258 executor.start(|context| async move {
259 let cfg = Config {
261 partition: "test_ordinal".into(),
262 items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
264 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
265 };
266 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
267 .await
268 .expect("Failed to initialize store");
269
270 let indices = vec![
272 (0u64, FixedBytes::new([0u8; 32])),
273 (99u64, FixedBytes::new([99u8; 32])), (100u64, FixedBytes::new([100u8; 32])), (500u64, FixedBytes::new([200u8; 32])), ];
277
278 for (index, value) in &indices {
279 store
280 .put(*index, value.clone())
281 .await
282 .expect("Failed to put data");
283 }
284
285 assert!(!store.has(1));
287 assert!(!store.has(50));
288 assert!(!store.has(101));
289 assert!(!store.has(499));
290
291 store.sync().await.expect("Failed to sync");
293
294 for (index, value) in &indices {
295 let retrieved = store
296 .get(*index)
297 .await
298 .expect("Failed to get data")
299 .expect("Data not found");
300 assert_eq!(&retrieved, value);
301 }
302 });
303 }
304
305 #[test_traced]
306 fn test_next_gap() {
307 let executor = deterministic::Runner::default();
309 executor.start(|context| async move {
310 let cfg = Config {
312 partition: "test_ordinal".into(),
313 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
314 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
315 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
316 };
317 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
318 .await
319 .expect("Failed to initialize store");
320
321 store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
323 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
324 store.put(11, FixedBytes::new([11u8; 32])).await.unwrap();
325 store.put(14, FixedBytes::new([14u8; 32])).await.unwrap();
326
327 let (current_end, start_next) = store.next_gap(0);
329 assert!(current_end.is_none());
330 assert_eq!(start_next, Some(1));
331
332 let (current_end, start_next) = store.next_gap(1);
333 assert_eq!(current_end, Some(1));
334 assert_eq!(start_next, Some(10));
335
336 let (current_end, start_next) = store.next_gap(10);
337 assert_eq!(current_end, Some(11));
338 assert_eq!(start_next, Some(14));
339
340 let (current_end, start_next) = store.next_gap(11);
341 assert_eq!(current_end, Some(11));
342 assert_eq!(start_next, Some(14));
343
344 let (current_end, start_next) = store.next_gap(12);
345 assert!(current_end.is_none());
346 assert_eq!(start_next, Some(14));
347
348 let (current_end, start_next) = store.next_gap(14);
349 assert_eq!(current_end, Some(14));
350 assert!(start_next.is_none());
351 });
352 }
353
354 #[test_traced]
355 fn test_missing_items() {
356 let executor = deterministic::Runner::default();
358 executor.start(|context| async move {
359 let cfg = Config {
361 partition: "test_ordinal".into(),
362 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
363 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
364 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
365 };
366 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
367 .await
368 .expect("Failed to initialize store");
369
370 assert_eq!(store.missing_items(0, 5), Vec::<u64>::new());
372 assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
373
374 store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
376 store.put(2, FixedBytes::new([2u8; 32])).await.unwrap();
377 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
378 store.put(6, FixedBytes::new([6u8; 32])).await.unwrap();
379 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
380
381 assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
383 assert_eq!(store.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
384 assert_eq!(store.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
385
386 assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
388 assert_eq!(store.missing_items(4, 2), vec![4, 7]);
389
390 assert_eq!(store.missing_items(1, 3), vec![3, 4, 7]);
392 assert_eq!(store.missing_items(2, 4), vec![3, 4, 7, 8]);
393 assert_eq!(store.missing_items(5, 2), vec![7, 8]);
394
395 assert_eq!(store.missing_items(11, 5), Vec::<u64>::new());
397 assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
398
399 store.put(1000, FixedBytes::new([100u8; 32])).await.unwrap();
401
402 let items = store.missing_items(11, 10);
404 assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
405
406 let items = store.missing_items(990, 15);
408 assert_eq!(
409 items,
410 vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
411 );
412
413 store.sync().await.unwrap();
415 assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
416 assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
417
418 store.put(9999, FixedBytes::new([99u8; 32])).await.unwrap();
420 store
421 .put(10001, FixedBytes::new([101u8; 32]))
422 .await
423 .unwrap();
424
425 let items = store.missing_items(9998, 5);
427 assert_eq!(items, vec![9998, 10000]);
428 });
429 }
430
431 #[test_traced]
432 fn test_restart() {
433 let executor = deterministic::Runner::default();
435 executor.start(|context| async move {
436 let cfg = Config {
437 partition: "test_ordinal".into(),
438 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
439 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
440 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
441 };
442
443 {
445 let mut store =
446 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
447 .await
448 .expect("Failed to initialize store");
449
450 let values = vec![
451 (0u64, FixedBytes::new([0u8; 32])),
452 (100u64, FixedBytes::new([100u8; 32])),
453 (1000u64, FixedBytes::new([200u8; 32])),
454 ];
455
456 for (index, value) in &values {
457 store
458 .put(*index, value.clone())
459 .await
460 .expect("Failed to put data");
461 }
462
463 store.sync().await.expect("Failed to sync store");
464 }
465
466 {
468 let store =
469 Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
470 .await
471 .expect("Failed to initialize store");
472
473 let values = vec![
474 (0u64, FixedBytes::new([0u8; 32])),
475 (100u64, FixedBytes::new([100u8; 32])),
476 (1000u64, FixedBytes::new([200u8; 32])),
477 ];
478
479 for (index, value) in &values {
480 let retrieved = store
481 .get(*index)
482 .await
483 .expect("Failed to get data")
484 .expect("Data not found");
485 assert_eq!(&retrieved, value);
486 }
487
488 let (current_end, start_next) = store.next_gap(0);
490 assert_eq!(current_end, Some(0));
491 assert_eq!(start_next, Some(100));
492 }
493 });
494 }
495
496 #[test_traced]
497 fn test_invalid_record() {
498 let executor = deterministic::Runner::default();
500 executor.start(|context| async move {
501 let cfg = Config {
502 partition: "test_ordinal".into(),
503 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
504 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
505 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
506 };
507
508 {
510 let mut store =
511 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
512 .await
513 .expect("Failed to initialize store");
514
515 store
516 .put(0, FixedBytes::new([42u8; 32]))
517 .await
518 .expect("Failed to put data");
519 store.sync().await.expect("Failed to sync store");
520 }
521
522 {
524 let (blob, _) = context
525 .open("test_ordinal", &0u64.to_be_bytes())
526 .await
527 .unwrap();
528 blob.write_at(32, vec![0xFF]).await.unwrap();
530 blob.sync().await.unwrap();
531 }
532
533 {
535 let store =
536 Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
537 .await
538 .expect("Failed to initialize store");
539
540 let result = store.get(0).await.unwrap();
542 assert!(result.is_none());
543
544 assert!(!store.has(0));
546 }
547 });
548 }
549
550 #[test_traced]
551 fn test_get_nonexistent() {
552 let executor = deterministic::Runner::default();
554 executor.start(|context| async move {
555 let cfg = Config {
557 partition: "test_ordinal".into(),
558 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
559 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
560 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
561 };
562 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
563 .await
564 .expect("Failed to initialize store");
565
566 let retrieved = store.get(999).await.expect("Failed to get data");
568 assert!(retrieved.is_none());
569
570 assert!(!store.has(999));
572 });
573 }
574
575 #[test_traced]
576 fn test_destroy() {
577 let executor = deterministic::Runner::default();
579 executor.start(|context| async move {
580 let cfg = Config {
581 partition: "test_ordinal".into(),
582 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
583 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
584 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
585 };
586
587 {
589 let mut store =
590 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
591 .await
592 .expect("Failed to initialize store");
593
594 store
595 .put(0, FixedBytes::new([0u8; 32]))
596 .await
597 .expect("Failed to put data");
598 store
599 .put(1000, FixedBytes::new([100u8; 32]))
600 .await
601 .expect("Failed to put data");
602
603 store.destroy().await.expect("Failed to destroy store");
605 }
606
607 {
609 let store =
610 Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
611 .await
612 .expect("Failed to initialize store");
613
614 assert!(store.get(0).await.unwrap().is_none());
616 assert!(store.get(1000).await.unwrap().is_none());
617 assert!(!store.has(0));
618 assert!(!store.has(1000));
619 }
620 });
621 }
622
623 #[test_traced]
624 fn test_partial_record_write() {
625 let executor = deterministic::Runner::default();
627 executor.start(|context| async move {
628 let cfg = Config {
629 partition: "test_ordinal".into(),
630 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
631 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
632 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
633 };
634
635 {
637 let mut store =
638 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
639 .await
640 .expect("Failed to initialize store");
641
642 store
643 .put(0, FixedBytes::new([42u8; 32]))
644 .await
645 .expect("Failed to put data");
646 store
647 .put(1, FixedBytes::new([43u8; 32]))
648 .await
649 .expect("Failed to put data");
650 store.sync().await.expect("Failed to sync store");
651 }
652
653 {
655 let (blob, _) = context
656 .open("test_ordinal", &0u64.to_be_bytes())
657 .await
658 .unwrap();
659 blob.write_at(36, vec![0xFF; 32]).await.unwrap();
661 blob.sync().await.unwrap();
662 }
663
664 {
666 let store =
667 Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
668 .await
669 .expect("Failed to initialize store");
670
671 assert_eq!(
673 store.get(0).await.unwrap().unwrap(),
674 FixedBytes::new([42u8; 32])
675 );
676
677 assert!(!store.has(1));
679 assert!(store.get(1).await.unwrap().is_none());
680
681 let mut store_mut = store;
683 store_mut.put(1, FixedBytes::new([44u8; 32])).await.unwrap();
684 assert_eq!(
685 store_mut.get(1).await.unwrap().unwrap(),
686 FixedBytes::new([44u8; 32])
687 );
688 }
689 });
690 }
691
692 #[test_traced]
693 fn test_corrupted_value() {
694 let executor = deterministic::Runner::default();
696 executor.start(|context| async move {
697 let cfg = Config {
698 partition: "test_ordinal".into(),
699 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
700 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
701 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
702 };
703
704 {
706 let mut store =
707 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
708 .await
709 .expect("Failed to initialize store");
710
711 store
712 .put(0, FixedBytes::new([42u8; 32]))
713 .await
714 .expect("Failed to put data");
715 store
716 .put(1, FixedBytes::new([43u8; 32]))
717 .await
718 .expect("Failed to put data");
719 store.sync().await.expect("Failed to sync store");
720 }
721
722 {
724 let (blob, _) = context
725 .open("test_ordinal", &0u64.to_be_bytes())
726 .await
727 .unwrap();
728 blob.write_at(10, hex!("0xFFFFFFFF").to_vec())
730 .await
731 .unwrap();
732 blob.sync().await.unwrap();
733 }
734
735 {
737 let store =
738 Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
739 .await
740 .expect("Failed to initialize store");
741
742 assert!(!store.has(0));
744
745 assert!(store.has(1));
747 assert_eq!(
748 store.get(1).await.unwrap().unwrap(),
749 FixedBytes::new([43u8; 32])
750 );
751 }
752 });
753 }
754
755 #[test_traced]
756 fn test_crc_corruptions() {
757 let executor = deterministic::Runner::default();
759 executor.start(|context| async move {
760 let cfg = Config {
761 partition: "test_ordinal".into(),
762 items_per_blob: NZU64!(10), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
764 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
765 };
766
767 {
769 let mut store =
770 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
771 .await
772 .expect("Failed to initialize store");
773
774 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
776 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
777 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
778 store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
779 store.sync().await.expect("Failed to sync store");
780 }
781
782 {
784 let (blob, _) = context
786 .open("test_ordinal", &0u64.to_be_bytes())
787 .await
788 .unwrap();
789 blob.write_at(32, vec![0xFF]).await.unwrap(); blob.sync().await.unwrap();
791
792 let (blob, _) = context
794 .open("test_ordinal", &1u64.to_be_bytes())
795 .await
796 .unwrap();
797 blob.write_at(5, vec![0xFF; 4]).await.unwrap(); blob.sync().await.unwrap();
799 }
800
801 {
803 let store =
804 Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
805 .await
806 .expect("Failed to initialize store");
807
808 assert!(!store.has(0)); assert!(!store.has(10)); assert!(store.has(5));
814 assert!(store.has(15));
815 assert_eq!(
816 store.get(5).await.unwrap().unwrap(),
817 FixedBytes::new([5u8; 32])
818 );
819 assert_eq!(
820 store.get(15).await.unwrap().unwrap(),
821 FixedBytes::new([15u8; 32])
822 );
823 }
824 });
825 }
826
827 #[test_traced]
828 fn test_extra_bytes_in_blob() {
829 let executor = deterministic::Runner::default();
831 executor.start(|context| async move {
832 let cfg = Config {
833 partition: "test_ordinal".into(),
834 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
835 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
836 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
837 };
838
839 {
841 let mut store =
842 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
843 .await
844 .expect("Failed to initialize store");
845
846 store
847 .put(0, FixedBytes::new([42u8; 32]))
848 .await
849 .expect("Failed to put data");
850 store
851 .put(1, FixedBytes::new([43u8; 32]))
852 .await
853 .expect("Failed to put data");
854 store.sync().await.expect("Failed to sync store");
855 }
856
857 {
859 let (blob, size) = context
860 .open("test_ordinal", &0u64.to_be_bytes())
861 .await
862 .unwrap();
863 let mut garbage = vec![0xFF; 32]; let invalid_crc = 0xDEADBEEFu32;
867 garbage.extend_from_slice(&invalid_crc.to_be_bytes());
868 assert_eq!(garbage.len(), 36); blob.write_at(size, garbage).await.unwrap();
870 blob.sync().await.unwrap();
871 }
872
873 {
875 let store =
876 Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
877 .await
878 .expect("Failed to initialize store");
879
880 assert!(store.has(0));
882 assert!(store.has(1));
883 assert_eq!(
884 store.get(0).await.unwrap().unwrap(),
885 FixedBytes::new([42u8; 32])
886 );
887 assert_eq!(
888 store.get(1).await.unwrap().unwrap(),
889 FixedBytes::new([43u8; 32])
890 );
891
892 let mut store_mut = store;
894 store_mut.put(2, FixedBytes::new([44u8; 32])).await.unwrap();
895 assert_eq!(
896 store_mut.get(2).await.unwrap().unwrap(),
897 FixedBytes::new([44u8; 32])
898 );
899 }
900 });
901 }
902
903 #[test_traced]
904 fn test_zero_filled_records() {
905 let executor = deterministic::Runner::default();
907 executor.start(|context| async move {
908 let cfg = Config {
909 partition: "test_ordinal".into(),
910 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
911 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
912 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
913 };
914
915 {
917 let (blob, _) = context
918 .open("test_ordinal", &0u64.to_be_bytes())
919 .await
920 .unwrap();
921
922 let zeros = vec![0u8; 36 * 5]; blob.write_at(0, zeros).await.unwrap();
925
926 let mut valid_record = vec![44u8; 32];
928 let crc = Crc32::checksum(&valid_record);
929 valid_record.extend_from_slice(&crc.to_be_bytes());
930 blob.write_at(36 * 5, valid_record).await.unwrap();
931
932 blob.sync().await.unwrap();
933 }
934
935 {
937 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
938 .await
939 .expect("Failed to initialize store");
940
941 for i in 0..5 {
943 assert!(!store.has(i));
944 }
945
946 assert!(store.has(5));
948 assert_eq!(
949 store.get(5).await.unwrap().unwrap(),
950 FixedBytes::new([44u8; 32])
951 );
952 }
953 });
954 }
955
956 fn test_operations_and_restart(num_values: usize) -> String {
957 let executor = deterministic::Runner::default();
959 executor.start(|mut context| async move {
960 let cfg = Config {
961 partition: "test_ordinal".into(),
962 items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
964 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
965 };
966
967 let mut store =
969 Ordinal::<_, FixedBytes<128>>::init(context.with_label("first"), cfg.clone())
970 .await
971 .expect("Failed to initialize store");
972
973 let mut values = Vec::new();
975 let mut rng_index = 0u64;
976
977 for _ in 0..num_values {
978 let mut index_bytes = [0u8; 8];
980 context.fill_bytes(&mut index_bytes);
981 let index_offset = u64::from_be_bytes(index_bytes) % 1000;
982 let index = rng_index + index_offset;
983 rng_index = index + 1;
984
985 let mut value = [0u8; 128];
987 context.fill_bytes(&mut value);
988 let value = FixedBytes::<128>::new(value);
989
990 store
991 .put(index, value.clone())
992 .await
993 .expect("Failed to put data");
994 values.push((index, value));
995 }
996
997 store.sync().await.expect("Failed to sync");
999
1000 for (index, value) in &values {
1002 let retrieved = store
1003 .get(*index)
1004 .await
1005 .expect("Failed to get data")
1006 .expect("Data not found");
1007 assert_eq!(&retrieved, value);
1008 }
1009
1010 for i in 0..10 {
1012 let _ = store.next_gap(i * 100);
1013 }
1014
1015 store.sync().await.expect("Failed to sync store");
1017 drop(store);
1018
1019 let mut store = Ordinal::<_, FixedBytes<128>>::init(context.with_label("second"), cfg)
1021 .await
1022 .expect("Failed to initialize store");
1023
1024 for (index, value) in &values {
1026 let retrieved = store
1027 .get(*index)
1028 .await
1029 .expect("Failed to get data")
1030 .expect("Data not found");
1031 assert_eq!(&retrieved, value);
1032 }
1033
1034 for _ in 0..10 {
1036 let mut index_bytes = [0u8; 8];
1037 context.fill_bytes(&mut index_bytes);
1038 let index = u64::from_be_bytes(index_bytes) % 10000;
1039
1040 let mut value = [0u8; 128];
1041 context.fill_bytes(&mut value);
1042 let value = FixedBytes::<128>::new(value);
1043
1044 store.put(index, value).await.expect("Failed to put data");
1045 }
1046
1047 store.sync().await.expect("Failed to sync");
1049
1050 context.auditor().state()
1052 })
1053 }
1054
1055 #[test_group("slow")]
1056 #[test_traced]
1057 fn test_determinism() {
1058 let state1 = test_operations_and_restart(100);
1059 let state2 = test_operations_and_restart(100);
1060 assert_eq!(state1, state2);
1061 }
1062
1063 #[test_traced]
1064 fn test_prune_basic() {
1065 let executor = deterministic::Runner::default();
1067 executor.start(|context| async move {
1068 let cfg = Config {
1069 partition: "test_ordinal".into(),
1070 items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1072 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1073 };
1074
1075 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1076 .await
1077 .expect("Failed to initialize store");
1078
1079 let values = vec![
1081 (0u64, FixedBytes::new([0u8; 32])), (50u64, FixedBytes::new([50u8; 32])), (100u64, FixedBytes::new([100u8; 32])), (150u64, FixedBytes::new([150u8; 32])), (200u64, FixedBytes::new([200u8; 32])), (300u64, FixedBytes::new([44u8; 32])), ];
1088
1089 for (index, value) in &values {
1090 store
1091 .put(*index, value.clone())
1092 .await
1093 .expect("Failed to put data");
1094 }
1095 store.sync().await.unwrap();
1096
1097 for (index, value) in &values {
1099 assert_eq!(store.get(*index).await.unwrap().unwrap(), *value);
1100 }
1101
1102 store.prune(150).await.unwrap();
1104 let buffer = context.encode();
1105 assert!(buffer.contains("pruned_total 1"));
1106
1107 assert!(!store.has(0));
1109 assert!(!store.has(50));
1110 assert!(store.get(0).await.unwrap().is_none());
1111 assert!(store.get(50).await.unwrap().is_none());
1112
1113 assert!(store.has(100));
1115 assert!(store.has(150));
1116 assert!(store.has(200));
1117 assert!(store.has(300));
1118 assert_eq!(store.get(100).await.unwrap().unwrap(), values[2].1);
1119 assert_eq!(store.get(150).await.unwrap().unwrap(), values[3].1);
1120 assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1121 assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1122
1123 store.prune(250).await.unwrap();
1125 let buffer = context.encode();
1126 assert!(buffer.contains("pruned_total 2"));
1127
1128 assert!(!store.has(100));
1130 assert!(!store.has(150));
1131 assert!(store.get(100).await.unwrap().is_none());
1132 assert!(store.get(150).await.unwrap().is_none());
1133
1134 assert!(store.has(200));
1136 assert!(store.has(300));
1137 assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1138 assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1139 });
1140 }
1141
1142 #[test_traced]
1143 fn test_prune_with_gaps() {
1144 let executor = deterministic::Runner::default();
1146 executor.start(|context| async move {
1147 let cfg = Config {
1148 partition: "test_ordinal".into(),
1149 items_per_blob: NZU64!(100),
1150 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1151 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1152 };
1153
1154 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1155 .await
1156 .expect("Failed to initialize store");
1157
1158 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1160 store.put(105, FixedBytes::new([105u8; 32])).await.unwrap();
1161 store.put(305, FixedBytes::new([49u8; 32])).await.unwrap();
1162 store.sync().await.unwrap();
1163
1164 let (current_end, next_start) = store.next_gap(0);
1166 assert!(current_end.is_none());
1167 assert_eq!(next_start, Some(5));
1168
1169 let (current_end, next_start) = store.next_gap(5);
1170 assert_eq!(current_end, Some(5));
1171 assert_eq!(next_start, Some(105));
1172
1173 store.prune(150).await.unwrap();
1175
1176 assert!(!store.has(5));
1178 assert!(store.get(5).await.unwrap().is_none());
1179
1180 assert!(store.has(105));
1182 assert!(store.has(305));
1183
1184 let (current_end, next_start) = store.next_gap(0);
1185 assert!(current_end.is_none());
1186 assert_eq!(next_start, Some(105));
1187
1188 let (current_end, next_start) = store.next_gap(105);
1189 assert_eq!(current_end, Some(105));
1190 assert_eq!(next_start, Some(305));
1191 });
1192 }
1193
1194 #[test_traced]
1195 fn test_prune_no_op() {
1196 let executor = deterministic::Runner::default();
1198 executor.start(|context| async move {
1199 let cfg = Config {
1200 partition: "test_ordinal".into(),
1201 items_per_blob: NZU64!(100),
1202 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1203 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1204 };
1205
1206 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1207 .await
1208 .expect("Failed to initialize store");
1209
1210 store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1212 store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1213 store.sync().await.unwrap();
1214
1215 store.prune(50).await.unwrap();
1217
1218 assert!(store.has(100));
1220 assert!(store.has(200));
1221 let buffer = context.encode();
1222 assert!(buffer.contains("pruned_total 0"));
1223
1224 store.prune(100).await.unwrap();
1226
1227 assert!(store.has(100));
1229 assert!(store.has(200));
1230 let buffer = context.encode();
1231 assert!(buffer.contains("pruned_total 0"));
1232 });
1233 }
1234
1235 #[test_traced]
1236 fn test_prune_empty_store() {
1237 let executor = deterministic::Runner::default();
1239 executor.start(|context| async move {
1240 let cfg = Config {
1241 partition: "test_ordinal".into(),
1242 items_per_blob: NZU64!(100),
1243 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1244 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1245 };
1246
1247 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1248 .await
1249 .expect("Failed to initialize store");
1250
1251 store.prune(1000).await.unwrap();
1253
1254 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1256 assert!(store.has(0));
1257 });
1258 }
1259
1260 #[test_traced]
1261 fn test_prune_after_restart() {
1262 let executor = deterministic::Runner::default();
1264 executor.start(|context| async move {
1265 let cfg = Config {
1266 partition: "test_ordinal".into(),
1267 items_per_blob: NZU64!(100),
1268 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1269 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1270 };
1271
1272 {
1274 let mut store =
1275 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1276 .await
1277 .expect("Failed to initialize store");
1278
1279 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1280 store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1281 store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1282 store.sync().await.unwrap();
1283 }
1284
1285 {
1287 let mut store =
1288 Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
1289 .await
1290 .expect("Failed to initialize store");
1291
1292 assert!(store.has(0));
1294 assert!(store.has(100));
1295 assert!(store.has(200));
1296
1297 store.prune(150).await.unwrap();
1299
1300 assert!(!store.has(0));
1302 assert!(store.has(100));
1303 assert!(store.has(200));
1304
1305 store.sync().await.unwrap();
1306 }
1307
1308 {
1310 let store =
1311 Ordinal::<_, FixedBytes<32>>::init(context.with_label("third"), cfg.clone())
1312 .await
1313 .expect("Failed to initialize store");
1314
1315 assert!(!store.has(0));
1316 assert!(store.has(100));
1317 assert!(store.has(200));
1318
1319 let (current_end, next_start) = store.next_gap(0);
1321 assert!(current_end.is_none());
1322 assert_eq!(next_start, Some(100));
1323 }
1324 });
1325 }
1326
1327 #[test_traced]
1328 fn test_prune_multiple_operations() {
1329 let executor = deterministic::Runner::default();
1331 executor.start(|context| async move {
1332 let cfg = Config {
1333 partition: "test_ordinal".into(),
1334 items_per_blob: NZU64!(50), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1336 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1337 };
1338
1339 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1340 .await
1341 .expect("Failed to initialize store");
1342
1343 let mut values = Vec::new();
1345 for i in 0..10 {
1346 let index = i * 50 + 25; let value = FixedBytes::new([i as u8; 32]);
1348 store.put(index, value.clone()).await.unwrap();
1349 values.push((index, value));
1350 }
1351 store.sync().await.unwrap();
1352
1353 for i in 1..5 {
1355 let prune_index = i * 50 + 10;
1356 store.prune(prune_index).await.unwrap();
1357
1358 for (index, _) in &values {
1360 if *index < prune_index {
1361 assert!(!store.has(*index), "Index {index} should be pruned");
1362 } else {
1363 assert!(store.has(*index), "Index {index} should not be pruned");
1364 }
1365 }
1366 }
1367
1368 let buffer = context.encode();
1370 assert!(buffer.contains("pruned_total 4"));
1371
1372 for i in 4..10 {
1374 let index = i * 50 + 25;
1375 assert!(store.has(index));
1376 assert_eq!(
1377 store.get(index).await.unwrap().unwrap(),
1378 values[i as usize].1
1379 );
1380 }
1381 });
1382 }
1383
1384 #[test_traced]
1385 fn test_prune_blob_boundaries() {
1386 let executor = deterministic::Runner::default();
1388 executor.start(|context| async move {
1389 let cfg = Config {
1390 partition: "test_ordinal".into(),
1391 items_per_blob: NZU64!(100),
1392 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1393 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1394 };
1395
1396 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1397 .await
1398 .expect("Failed to initialize store");
1399
1400 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); store.put(99, FixedBytes::new([99u8; 32])).await.unwrap(); store.put(100, FixedBytes::new([100u8; 32])).await.unwrap(); store.put(199, FixedBytes::new([199u8; 32])).await.unwrap(); store.put(200, FixedBytes::new([200u8; 32])).await.unwrap(); store.sync().await.unwrap();
1407
1408 store.prune(100).await.unwrap();
1412 assert!(!store.has(0));
1413 assert!(!store.has(99));
1414 assert!(store.has(100));
1415 assert!(store.has(199));
1416 assert!(store.has(200));
1417
1418 store.prune(199).await.unwrap();
1420 assert!(store.has(100));
1421 assert!(store.has(199));
1422 assert!(store.has(200));
1423
1424 store.prune(200).await.unwrap();
1426 assert!(!store.has(100));
1427 assert!(!store.has(199));
1428 assert!(store.has(200));
1429
1430 let buffer = context.encode();
1431 assert!(buffer.contains("pruned_total 2"));
1432 });
1433 }
1434
1435 #[test_traced]
1436 fn test_prune_non_contiguous_sections() {
1437 let executor = deterministic::Runner::default();
1439 executor.start(|context| async move {
1440 let cfg = Config {
1441 partition: "test_ordinal".into(),
1442 items_per_blob: NZU64!(100),
1443 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1444 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1445 };
1446
1447 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1448 .await
1449 .expect("Failed to initialize store");
1450
1451 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); store.put(250, FixedBytes::new([50u8; 32])).await.unwrap(); store.put(500, FixedBytes::new([44u8; 32])).await.unwrap(); store.put(750, FixedBytes::new([45u8; 32])).await.unwrap(); store.sync().await.unwrap();
1457
1458 assert!(store.has(0));
1460 assert!(store.has(250));
1461 assert!(store.has(500));
1462 assert!(store.has(750));
1463
1464 store.prune(300).await.unwrap();
1466
1467 assert!(!store.has(0)); assert!(!store.has(250)); assert!(store.has(500)); assert!(store.has(750)); let buffer = context.encode();
1474 assert!(buffer.contains("pruned_total 2"));
1475
1476 store.prune(600).await.unwrap();
1478
1479 assert!(!store.has(500)); assert!(store.has(750)); let buffer = context.encode();
1484 assert!(buffer.contains("pruned_total 3"));
1485
1486 store.prune(1000).await.unwrap();
1488
1489 assert!(!store.has(750)); let buffer = context.encode();
1493 assert!(buffer.contains("pruned_total 4"));
1494 });
1495 }
1496
1497 #[test_traced]
1498 fn test_prune_removes_correct_pending() {
1499 let executor = deterministic::Runner::default();
1501 executor.start(|context| async move {
1502 let cfg = Config {
1503 partition: "test_ordinal".into(),
1504 items_per_blob: NZU64!(100),
1505 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1506 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1507 };
1508 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1509 .await
1510 .expect("Failed to initialize store");
1511
1512 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1514 store.sync().await.unwrap();
1515
1516 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap(); store.put(110, FixedBytes::new([110u8; 32])).await.unwrap(); assert!(store.has(5));
1522 assert!(store.has(10));
1523 assert!(store.has(110));
1524
1525 store.prune(150).await.unwrap();
1527
1528 assert!(!store.has(5));
1530 assert!(!store.has(10));
1531
1532 assert!(store.has(110));
1534 assert_eq!(
1535 store.get(110).await.unwrap().unwrap(),
1536 FixedBytes::new([110u8; 32])
1537 );
1538
1539 store.sync().await.unwrap();
1541 assert!(store.has(110));
1542 assert_eq!(
1543 store.get(110).await.unwrap().unwrap(),
1544 FixedBytes::new([110u8; 32])
1545 );
1546 });
1547 }
1548
1549 #[test_traced]
1550 fn test_init_with_bits_none() {
1551 let executor = deterministic::Runner::default();
1553 executor.start(|context| async move {
1554 let cfg = Config {
1555 partition: "test_ordinal".into(),
1556 items_per_blob: NZU64!(10), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1558 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1559 };
1560
1561 {
1563 let mut store =
1564 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1565 .await
1566 .expect("Failed to initialize store");
1567
1568 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1570 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1571 store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1572
1573 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1575 store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
1576
1577 store.put(25, FixedBytes::new([25u8; 32])).await.unwrap();
1579
1580 store.sync().await.unwrap();
1581 }
1582
1583 {
1585 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1586 context.with_label("second"),
1587 cfg.clone(),
1588 None,
1589 )
1590 .await
1591 .expect("Failed to initialize store with bits");
1592
1593 assert!(store.has(0));
1595 assert!(store.has(5));
1596 assert!(store.has(9));
1597 assert!(store.has(10));
1598 assert!(store.has(15));
1599 assert!(store.has(25));
1600
1601 assert!(!store.has(1));
1603 assert!(!store.has(11));
1604 assert!(!store.has(20));
1605
1606 assert_eq!(
1608 store.get(0).await.unwrap().unwrap(),
1609 FixedBytes::new([0u8; 32])
1610 );
1611 assert_eq!(
1612 store.get(15).await.unwrap().unwrap(),
1613 FixedBytes::new([15u8; 32])
1614 );
1615 }
1616 });
1617 }
1618
1619 #[test_traced]
1620 fn test_init_with_bits_empty_hashmap() {
1621 let executor = deterministic::Runner::default();
1623 executor.start(|context| async move {
1624 let cfg = Config {
1625 partition: "test_ordinal".into(),
1626 items_per_blob: NZU64!(10),
1627 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1628 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1629 };
1630
1631 {
1633 let mut store =
1634 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1635 .await
1636 .expect("Failed to initialize store");
1637
1638 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1639 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1640 store.put(20, FixedBytes::new([20u8; 32])).await.unwrap();
1641
1642 store.sync().await.unwrap();
1643 }
1644
1645 {
1647 let bits: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1648 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1649 context.with_label("second"),
1650 cfg.clone(),
1651 Some(bits),
1652 )
1653 .await
1654 .expect("Failed to initialize store with bits");
1655
1656 assert!(!store.has(0));
1658 assert!(!store.has(10));
1659 assert!(!store.has(20));
1660 }
1661 });
1662 }
1663
1664 #[test_traced]
1665 fn test_init_with_bits_selective_sections() {
1666 let executor = deterministic::Runner::default();
1668 executor.start(|context| async move {
1669 let cfg = Config {
1670 partition: "test_ordinal".into(),
1671 items_per_blob: NZU64!(10),
1672 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1673 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1674 };
1675
1676 {
1678 let mut store =
1679 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1680 .await
1681 .expect("Failed to initialize store");
1682
1683 for i in 0..10 {
1685 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1686 }
1687
1688 for i in 10..20 {
1690 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1691 }
1692
1693 for i in 20..30 {
1695 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1696 }
1697
1698 store.sync().await.unwrap();
1699 }
1700
1701 {
1703 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1704
1705 let mut bitmap = BitMap::zeroes(10);
1707 bitmap.set(2, true); bitmap.set(5, true); bitmap.set(8, true); let bitmap_option = Some(bitmap);
1711
1712 bits_map.insert(1, &bitmap_option);
1713
1714 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1715 context.with_label("second"),
1716 cfg.clone(),
1717 Some(bits_map),
1718 )
1719 .await
1720 .expect("Failed to initialize store with bits");
1721
1722 assert!(store.has(12));
1724 assert!(store.has(15));
1725 assert!(store.has(18));
1726
1727 assert!(!store.has(10));
1729 assert!(!store.has(11));
1730 assert!(!store.has(13));
1731 assert!(!store.has(14));
1732 assert!(!store.has(16));
1733 assert!(!store.has(17));
1734 assert!(!store.has(19));
1735
1736 for i in 0..10 {
1738 assert!(!store.has(i));
1739 }
1740 for i in 20..30 {
1741 assert!(!store.has(i));
1742 }
1743
1744 assert_eq!(
1746 store.get(12).await.unwrap().unwrap(),
1747 FixedBytes::new([12u8; 32])
1748 );
1749 assert_eq!(
1750 store.get(15).await.unwrap().unwrap(),
1751 FixedBytes::new([15u8; 32])
1752 );
1753 assert_eq!(
1754 store.get(18).await.unwrap().unwrap(),
1755 FixedBytes::new([18u8; 32])
1756 );
1757 }
1758 });
1759 }
1760
1761 #[test_traced]
1762 fn test_init_with_bits_none_option_all_records_exist() {
1763 let executor = deterministic::Runner::default();
1765 executor.start(|context| async move {
1766 let cfg = Config {
1767 partition: "test_ordinal".into(),
1768 items_per_blob: NZU64!(5),
1769 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1770 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1771 };
1772
1773 {
1775 let mut store =
1776 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1777 .await
1778 .expect("Failed to initialize store");
1779
1780 for i in 5..10 {
1782 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1783 }
1784
1785 store.sync().await.unwrap();
1786 }
1787
1788 {
1790 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1791 let none_option: Option<BitMap> = None;
1792 bits_map.insert(1, &none_option);
1793
1794 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1795 context.with_label("second"),
1796 cfg.clone(),
1797 Some(bits_map),
1798 )
1799 .await
1800 .expect("Failed to initialize store with bits");
1801
1802 for i in 5..10 {
1804 assert!(store.has(i));
1805 assert_eq!(
1806 store.get(i).await.unwrap().unwrap(),
1807 FixedBytes::new([i as u8; 32])
1808 );
1809 }
1810 }
1811 });
1812 }
1813
1814 #[test_traced]
1815 #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(6)")]
1816 fn test_init_with_bits_none_option_missing_record_panics() {
1817 let executor = deterministic::Runner::default();
1819 executor.start(|context| async move {
1820 let cfg = Config {
1821 partition: "test_ordinal".into(),
1822 items_per_blob: NZU64!(5),
1823 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1824 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1825 };
1826
1827 {
1829 let mut store =
1830 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1831 .await
1832 .expect("Failed to initialize store");
1833
1834 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1836 store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1838 store.put(8, FixedBytes::new([8u8; 32])).await.unwrap();
1839 store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1840
1841 store.sync().await.unwrap();
1842 }
1843
1844 {
1847 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1848 let none_option: Option<BitMap> = None;
1849 bits_map.insert(1, &none_option);
1850
1851 let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1852 context.with_label("second"),
1853 cfg.clone(),
1854 Some(bits_map),
1855 )
1856 .await
1857 .expect("Failed to initialize store with bits");
1858 }
1859 });
1860 }
1861
1862 #[test_traced]
1863 fn test_init_with_bits_mixed_sections() {
1864 let executor = deterministic::Runner::default();
1866 executor.start(|context| async move {
1867 let cfg = Config {
1868 partition: "test_ordinal".into(),
1869 items_per_blob: NZU64!(5),
1870 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1871 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1872 };
1873
1874 {
1876 let mut store =
1877 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1878 .await
1879 .expect("Failed to initialize store");
1880
1881 for i in 0..5 {
1883 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1884 }
1885
1886 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1888 store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1889 store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1890
1891 for i in 10..15 {
1893 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1894 }
1895
1896 store.sync().await.unwrap();
1897 }
1898
1899 {
1901 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1902
1903 let none_option: Option<BitMap> = None;
1905 bits_map.insert(0, &none_option);
1906
1907 let mut bitmap1 = BitMap::zeroes(5);
1909 bitmap1.set(0, true); bitmap1.set(2, true); let bitmap1_option = Some(bitmap1);
1913 bits_map.insert(1, &bitmap1_option);
1914
1915 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1918 context.with_label("second"),
1919 cfg.clone(),
1920 Some(bits_map),
1921 )
1922 .await
1923 .expect("Failed to initialize store with bits");
1924
1925 for i in 0..5 {
1927 assert!(store.has(i));
1928 assert_eq!(
1929 store.get(i).await.unwrap().unwrap(),
1930 FixedBytes::new([i as u8; 32])
1931 );
1932 }
1933
1934 assert!(store.has(5));
1936 assert!(store.has(7));
1937 assert!(!store.has(6));
1938 assert!(!store.has(8));
1939 assert!(!store.has(9)); for i in 10..15 {
1943 assert!(!store.has(i));
1944 }
1945 }
1946 });
1947 }
1948
1949 #[test_traced]
1950 #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(2)")]
1951 fn test_init_with_bits_corrupted_records() {
1952 let executor = deterministic::Runner::default();
1954 executor.start(|context| async move {
1955 let cfg = Config {
1956 partition: "test_ordinal".into(),
1957 items_per_blob: NZU64!(5),
1958 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1959 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1960 };
1961
1962 {
1964 let mut store =
1965 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1966 .await
1967 .expect("Failed to initialize store");
1968
1969 for i in 0..5 {
1971 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1972 }
1973
1974 store.sync().await.unwrap();
1975 }
1976
1977 {
1979 let (blob, _) = context
1980 .open("test_ordinal", &0u64.to_be_bytes())
1981 .await
1982 .unwrap();
1983 let offset = 2 * 36 + 32; blob.write_at(offset, vec![0xFF]).await.unwrap();
1986 blob.sync().await.unwrap();
1987 }
1988
1989 {
1991 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1992
1993 let mut bitmap = BitMap::zeroes(5);
1995 bitmap.set(0, true); bitmap.set(2, true); bitmap.set(4, true); let bitmap_option = Some(bitmap);
1999 bits_map.insert(0, &bitmap_option);
2000
2001 let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
2002 context.with_label("second"),
2003 cfg.clone(),
2004 Some(bits_map),
2005 )
2006 .await
2007 .expect("Failed to initialize store with bits");
2008 }
2009 });
2010 }
2011
2012 #[derive(Debug, PartialEq, Eq)]
2014 pub struct DummyValue {
2015 pub value: u64,
2016 }
2017
2018 impl Write for DummyValue {
2019 fn write(&self, buf: &mut impl BufMut) {
2020 self.value.write(buf);
2021 }
2022 }
2023
2024 impl Read for DummyValue {
2025 type Cfg = ();
2026
2027 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
2028 let value = u64::read(buf)?;
2029 if value == 0 {
2030 return Err(commonware_codec::Error::Invalid(
2031 "DummyValue",
2032 "value must be non-zero",
2033 ));
2034 }
2035 Ok(Self { value })
2036 }
2037 }
2038
2039 impl FixedSize for DummyValue {
2040 const SIZE: usize = u64::SIZE;
2041 }
2042
2043 #[test_traced]
2044 fn test_init_skip_unparseable_record() {
2045 let executor = deterministic::Runner::default();
2047 executor.start(|context| async move {
2048 let cfg = Config {
2049 partition: "test_ordinal".into(),
2050 items_per_blob: NZU64!(1),
2051 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
2052 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
2053 };
2054
2055 {
2057 let mut store =
2058 Ordinal::<_, DummyValue>::init(context.with_label("first"), cfg.clone())
2059 .await
2060 .expect("Failed to initialize store");
2061
2062 store.put(1, DummyValue { value: 1 }).await.unwrap();
2064 store.put(2, DummyValue { value: 0 }).await.unwrap(); store.put(4, DummyValue { value: 4 }).await.unwrap();
2066
2067 store.sync().await.unwrap();
2068 }
2069
2070 {
2072 let store =
2073 Ordinal::<_, DummyValue>::init(context.with_label("second"), cfg.clone())
2074 .await
2075 .expect("Failed to initialize store");
2076
2077 assert!(store.has(1), "Record 1 should be available");
2079 assert_eq!(
2080 store.get(1).await.unwrap().unwrap(),
2081 DummyValue { value: 1 },
2082 "Record 0 should have correct value"
2083 );
2084
2085 assert!(
2087 !store.has(2),
2088 "Record 2 should not be available (unparseable)"
2089 );
2090
2091 assert!(
2093 store.has(4),
2094 "Record 4 should be available - we should not exit early on unparseable record"
2095 );
2096 assert_eq!(
2097 store.get(4).await.unwrap().unwrap(),
2098 DummyValue { value: 4 },
2099 "Record 4 should have correct value"
2100 );
2101 }
2102 });
2103 }
2104}