1mod storage;
97
98use std::num::{NonZeroU64, NonZeroUsize};
99pub use storage::Ordinal;
100use thiserror::Error;
101
102#[derive(Debug, Error)]
104pub enum Error {
105 #[error("runtime error: {0}")]
106 Runtime(#[from] commonware_runtime::Error),
107 #[error("codec error: {0}")]
108 Codec(#[from] commonware_codec::Error),
109 #[error("invalid blob name: {0}")]
110 InvalidBlobName(String),
111 #[error("invalid record: {0}")]
112 InvalidRecord(u64),
113 #[error("missing record at {0}")]
114 MissingRecord(u64),
115}
116
117#[derive(Clone)]
119pub struct Config {
120 pub partition: String,
122
123 pub items_per_blob: NonZeroU64,
125
126 pub write_buffer: NonZeroUsize,
128
129 pub replay_buffer: NonZeroUsize,
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use commonware_codec::{FixedSize, Read, ReadExt, Write};
137 use commonware_cryptography::Crc32;
138 use commonware_formatting::hex;
139 use commonware_macros::{test_group, test_traced};
140 use commonware_runtime::{
141 deterministic, Blob, Buf, BufMut, Metrics as _, Runner, Storage, Supervisor as _,
142 };
143 use commonware_utils::{bitmap::BitMap, sequence::FixedBytes, NZUsize, NZU64};
144 use rand::RngCore;
145 use std::collections::BTreeMap;
146
147 const DEFAULT_ITEMS_PER_BLOB: u64 = 1000;
148 const DEFAULT_WRITE_BUFFER: usize = 4096;
149 const DEFAULT_REPLAY_BUFFER: usize = 1024 * 1024;
150
151 #[test_traced]
152 fn test_put_get() {
153 let executor = deterministic::Runner::default();
155 executor.start(|context| async move {
156 let cfg = Config {
158 partition: "test-ordinal".into(),
159 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
160 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
161 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
162 };
163 let mut store =
164 Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
165 .await
166 .expect("Failed to initialize store");
167
168 let value = FixedBytes::new([42u8; 32]);
169
170 assert!(!store.has(0));
172
173 store
175 .put(0, value.clone())
176 .await
177 .expect("Failed to put data");
178
179 assert!(store.has(0));
181
182 let retrieved = store
184 .get(0)
185 .await
186 .expect("Failed to get data")
187 .expect("Data not found");
188 assert_eq!(retrieved, value);
189
190 store.sync().await.expect("Failed to sync data");
192
193 let buffer = context.encode();
195 assert!(buffer.contains("gets_total 1"), "{}", buffer);
196 assert!(buffer.contains("puts_total 1"), "{}", buffer);
197 assert!(buffer.contains("has_total 2"), "{}", buffer);
198 assert!(buffer.contains("syncs_total 1"), "{}", buffer);
199 assert!(buffer.contains("pruned_total 0"), "{}", buffer);
200
201 let retrieved = store
203 .get(0)
204 .await
205 .expect("Failed to get data")
206 .expect("Data not found");
207 assert_eq!(retrieved, value);
208 });
209 }
210
211 #[test_traced]
212 fn test_concurrent_sync_does_not_report_success_while_flush_fails() {
213 let executor = deterministic::Runner::default();
214 executor.start(|context| async move {
215 let cfg = Config {
216 partition: "test-ordinal".into(),
217 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
218 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
219 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
220 };
221 let mut store =
222 Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
223 .await
224 .expect("Failed to initialize store");
225
226 store
227 .put(0, FixedBytes::new([42u8; 32]))
228 .await
229 .expect("Failed to put data");
230
231 let section = 0u64.to_be_bytes();
233 context
234 .remove(&cfg.partition, Some(§ion))
235 .await
236 .expect("Failed to remove blob");
237
238 let (first, second) = futures::future::join(store.sync(), store.sync()).await;
240 assert!(first.is_err(), "first sync unexpectedly succeeded");
241 assert!(second.is_err(), "second sync unexpectedly succeeded");
242 });
243 }
244
245 #[test_traced]
246 fn test_multiple_indices() {
247 let executor = deterministic::Runner::default();
249 executor.start(|context| async move {
250 let cfg = Config {
252 partition: "test-ordinal".into(),
253 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
254 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
255 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
256 };
257 let mut store =
258 Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
259 .await
260 .expect("Failed to initialize store");
261
262 let indices = vec![
264 (0u64, FixedBytes::new([0u8; 32])),
265 (5u64, FixedBytes::new([5u8; 32])),
266 (10u64, FixedBytes::new([10u8; 32])),
267 (100u64, FixedBytes::new([100u8; 32])),
268 (1000u64, FixedBytes::new([200u8; 32])), ];
270
271 for (index, value) in &indices {
272 store
273 .put(*index, value.clone())
274 .await
275 .expect("Failed to put data");
276 }
277
278 store.sync().await.expect("Failed to sync");
280
281 for (index, value) in &indices {
283 let retrieved = store
284 .get(*index)
285 .await
286 .expect("Failed to get data")
287 .expect("Data not found");
288 assert_eq!(&retrieved, value);
289 }
290 });
291 }
292
293 #[test_traced]
294 fn test_sparse_indices() {
295 let executor = deterministic::Runner::default();
297 executor.start(|context| async move {
298 let cfg = Config {
300 partition: "test-ordinal".into(),
301 items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
303 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
304 };
305 let mut store =
306 Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
307 .await
308 .expect("Failed to initialize store");
309
310 let indices = vec![
312 (0u64, FixedBytes::new([0u8; 32])),
313 (99u64, FixedBytes::new([99u8; 32])), (100u64, FixedBytes::new([100u8; 32])), (500u64, FixedBytes::new([200u8; 32])), ];
317
318 for (index, value) in &indices {
319 store
320 .put(*index, value.clone())
321 .await
322 .expect("Failed to put data");
323 }
324
325 assert!(!store.has(1));
327 assert!(!store.has(50));
328 assert!(!store.has(101));
329 assert!(!store.has(499));
330
331 store.sync().await.expect("Failed to sync");
333
334 for (index, value) in &indices {
335 let retrieved = store
336 .get(*index)
337 .await
338 .expect("Failed to get data")
339 .expect("Data not found");
340 assert_eq!(&retrieved, value);
341 }
342 });
343 }
344
345 #[test_traced]
346 fn test_next_gap() {
347 let executor = deterministic::Runner::default();
349 executor.start(|context| async move {
350 let cfg = Config {
352 partition: "test-ordinal".into(),
353 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
354 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
355 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
356 };
357 let mut store =
358 Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
359 .await
360 .expect("Failed to initialize store");
361
362 store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
364 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
365 store.put(11, FixedBytes::new([11u8; 32])).await.unwrap();
366 store.put(14, FixedBytes::new([14u8; 32])).await.unwrap();
367
368 let (current_end, start_next) = store.next_gap(0);
370 assert!(current_end.is_none());
371 assert_eq!(start_next, Some(1));
372
373 let (current_end, start_next) = store.next_gap(1);
374 assert_eq!(current_end, Some(1));
375 assert_eq!(start_next, Some(10));
376
377 let (current_end, start_next) = store.next_gap(10);
378 assert_eq!(current_end, Some(11));
379 assert_eq!(start_next, Some(14));
380
381 let (current_end, start_next) = store.next_gap(11);
382 assert_eq!(current_end, Some(11));
383 assert_eq!(start_next, Some(14));
384
385 let (current_end, start_next) = store.next_gap(12);
386 assert!(current_end.is_none());
387 assert_eq!(start_next, Some(14));
388
389 let (current_end, start_next) = store.next_gap(14);
390 assert_eq!(current_end, Some(14));
391 assert!(start_next.is_none());
392 });
393 }
394
395 #[test_traced]
396 fn test_missing_items() {
397 let executor = deterministic::Runner::default();
399 executor.start(|context| async move {
400 let cfg = Config {
402 partition: "test-ordinal".into(),
403 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
404 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
405 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
406 };
407 let mut store =
408 Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
409 .await
410 .expect("Failed to initialize store");
411
412 assert_eq!(store.missing_items(0, 5), Vec::<u64>::new());
414 assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
415
416 store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
418 store.put(2, FixedBytes::new([2u8; 32])).await.unwrap();
419 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
420 store.put(6, FixedBytes::new([6u8; 32])).await.unwrap();
421 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
422
423 assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
425 assert_eq!(store.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
426 assert_eq!(store.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
427
428 assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
430 assert_eq!(store.missing_items(4, 2), vec![4, 7]);
431
432 assert_eq!(store.missing_items(1, 3), vec![3, 4, 7]);
434 assert_eq!(store.missing_items(2, 4), vec![3, 4, 7, 8]);
435 assert_eq!(store.missing_items(5, 2), vec![7, 8]);
436
437 assert_eq!(store.missing_items(11, 5), Vec::<u64>::new());
439 assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
440
441 store.put(1000, FixedBytes::new([100u8; 32])).await.unwrap();
443
444 let items = store.missing_items(11, 10);
446 assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
447
448 let items = store.missing_items(990, 15);
450 assert_eq!(
451 items,
452 vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
453 );
454
455 store.sync().await.unwrap();
457 assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
458 assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
459
460 store.put(9999, FixedBytes::new([99u8; 32])).await.unwrap();
462 store
463 .put(10001, FixedBytes::new([101u8; 32]))
464 .await
465 .unwrap();
466
467 let items = store.missing_items(9998, 5);
469 assert_eq!(items, vec![9998, 10000]);
470 });
471 }
472
473 #[test_traced]
474 fn test_restart() {
475 let executor = deterministic::Runner::default();
477 executor.start(|context| async move {
478 let cfg = Config {
479 partition: "test-ordinal".into(),
480 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
481 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
482 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
483 };
484
485 {
487 let mut store =
488 Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
489 .await
490 .expect("Failed to initialize store");
491
492 let values = vec![
493 (0u64, FixedBytes::new([0u8; 32])),
494 (100u64, FixedBytes::new([100u8; 32])),
495 (1000u64, FixedBytes::new([200u8; 32])),
496 ];
497
498 for (index, value) in &values {
499 store
500 .put(*index, value.clone())
501 .await
502 .expect("Failed to put data");
503 }
504
505 store.sync().await.expect("Failed to sync store");
506 }
507
508 {
510 let store =
511 Ordinal::<_, FixedBytes<32>>::init(context.child("second"), cfg.clone())
512 .await
513 .expect("Failed to initialize store");
514
515 let values = vec![
516 (0u64, FixedBytes::new([0u8; 32])),
517 (100u64, FixedBytes::new([100u8; 32])),
518 (1000u64, FixedBytes::new([200u8; 32])),
519 ];
520
521 for (index, value) in &values {
522 let retrieved = store
523 .get(*index)
524 .await
525 .expect("Failed to get data")
526 .expect("Data not found");
527 assert_eq!(&retrieved, value);
528 }
529
530 let (current_end, start_next) = store.next_gap(0);
532 assert_eq!(current_end, Some(0));
533 assert_eq!(start_next, Some(100));
534 }
535 });
536 }
537
538 #[test_traced]
539 fn test_invalid_record() {
540 let executor = deterministic::Runner::default();
542 executor.start(|context| async move {
543 let cfg = Config {
544 partition: "test-ordinal".into(),
545 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
546 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
547 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
548 };
549
550 {
552 let mut store =
553 Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
554 .await
555 .expect("Failed to initialize store");
556
557 store
558 .put(0, FixedBytes::new([42u8; 32]))
559 .await
560 .expect("Failed to put data");
561 store.sync().await.expect("Failed to sync store");
562 }
563
564 {
566 let (blob, _) = context
567 .open("test-ordinal", &0u64.to_be_bytes())
568 .await
569 .unwrap();
570 blob.write_at_sync(32, vec![0xFF]).await.unwrap();
572 }
573
574 {
576 let store =
577 Ordinal::<_, FixedBytes<32>>::init(context.child("second"), cfg.clone())
578 .await
579 .expect("Failed to initialize store");
580
581 let result = store.get(0).await.unwrap();
583 assert!(result.is_none());
584
585 assert!(!store.has(0));
587 }
588 });
589 }
590
591 #[test_traced]
592 fn test_get_nonexistent() {
593 let executor = deterministic::Runner::default();
595 executor.start(|context| async move {
596 let cfg = Config {
598 partition: "test-ordinal".into(),
599 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
600 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
601 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
602 };
603 let store = Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
604 .await
605 .expect("Failed to initialize store");
606
607 let retrieved = store.get(999).await.expect("Failed to get data");
609 assert!(retrieved.is_none());
610
611 assert!(!store.has(999));
613 });
614 }
615
616 #[test_traced]
617 fn test_destroy() {
618 let executor = deterministic::Runner::default();
620 executor.start(|context| async move {
621 let cfg = Config {
622 partition: "test-ordinal".into(),
623 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
624 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
625 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
626 };
627
628 {
630 let mut store =
631 Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
632 .await
633 .expect("Failed to initialize store");
634
635 store
636 .put(0, FixedBytes::new([0u8; 32]))
637 .await
638 .expect("Failed to put data");
639 store
640 .put(1000, FixedBytes::new([100u8; 32]))
641 .await
642 .expect("Failed to put data");
643
644 store.destroy().await.expect("Failed to destroy store");
646 }
647
648 {
650 let store =
651 Ordinal::<_, FixedBytes<32>>::init(context.child("second"), cfg.clone())
652 .await
653 .expect("Failed to initialize store");
654
655 assert!(store.get(0).await.unwrap().is_none());
657 assert!(store.get(1000).await.unwrap().is_none());
658 assert!(!store.has(0));
659 assert!(!store.has(1000));
660 }
661 });
662 }
663
664 #[test_traced]
665 fn test_partial_record_write() {
666 let executor = deterministic::Runner::default();
668 executor.start(|context| async move {
669 let cfg = Config {
670 partition: "test-ordinal".into(),
671 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
672 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
673 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
674 };
675
676 {
678 let mut store =
679 Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
680 .await
681 .expect("Failed to initialize store");
682
683 store
684 .put(0, FixedBytes::new([42u8; 32]))
685 .await
686 .expect("Failed to put data");
687 store
688 .put(1, FixedBytes::new([43u8; 32]))
689 .await
690 .expect("Failed to put data");
691 store.sync().await.expect("Failed to sync store");
692 }
693
694 {
696 let (blob, _) = context
697 .open("test-ordinal", &0u64.to_be_bytes())
698 .await
699 .unwrap();
700 blob.write_at_sync(36, vec![0xFF; 32]).await.unwrap();
702 }
703
704 {
706 let store =
707 Ordinal::<_, FixedBytes<32>>::init(context.child("second"), cfg.clone())
708 .await
709 .expect("Failed to initialize store");
710
711 assert_eq!(
713 store.get(0).await.unwrap().unwrap(),
714 FixedBytes::new([42u8; 32])
715 );
716
717 assert!(!store.has(1));
719 assert!(store.get(1).await.unwrap().is_none());
720
721 let mut store_mut = store;
723 store_mut.put(1, FixedBytes::new([44u8; 32])).await.unwrap();
724 assert_eq!(
725 store_mut.get(1).await.unwrap().unwrap(),
726 FixedBytes::new([44u8; 32])
727 );
728 }
729 });
730 }
731
732 #[test_traced]
733 fn test_corrupted_value() {
734 let executor = deterministic::Runner::default();
736 executor.start(|context| async move {
737 let cfg = Config {
738 partition: "test-ordinal".into(),
739 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
740 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
741 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
742 };
743
744 {
746 let mut store =
747 Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
748 .await
749 .expect("Failed to initialize store");
750
751 store
752 .put(0, FixedBytes::new([42u8; 32]))
753 .await
754 .expect("Failed to put data");
755 store
756 .put(1, FixedBytes::new([43u8; 32]))
757 .await
758 .expect("Failed to put data");
759 store.sync().await.expect("Failed to sync store");
760 }
761
762 {
764 let (blob, _) = context
765 .open("test-ordinal", &0u64.to_be_bytes())
766 .await
767 .unwrap();
768 blob.write_at_sync(10, hex!("0xFFFFFFFF").to_vec())
770 .await
771 .unwrap();
772 }
773
774 {
776 let store =
777 Ordinal::<_, FixedBytes<32>>::init(context.child("second"), cfg.clone())
778 .await
779 .expect("Failed to initialize store");
780
781 assert!(!store.has(0));
783
784 assert!(store.has(1));
786 assert_eq!(
787 store.get(1).await.unwrap().unwrap(),
788 FixedBytes::new([43u8; 32])
789 );
790 }
791 });
792 }
793
794 #[test_traced]
795 fn test_crc_corruptions() {
796 let executor = deterministic::Runner::default();
798 executor.start(|context| async move {
799 let cfg = Config {
800 partition: "test-ordinal".into(),
801 items_per_blob: NZU64!(10), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
803 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
804 };
805
806 {
808 let mut store =
809 Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
810 .await
811 .expect("Failed to initialize store");
812
813 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
815 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
816 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
817 store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
818 store.sync().await.expect("Failed to sync store");
819 }
820
821 {
823 let (blob, _) = context
825 .open("test-ordinal", &0u64.to_be_bytes())
826 .await
827 .unwrap();
828 blob.write_at_sync(32, vec![0xFF]).await.unwrap(); let (blob, _) = context
832 .open("test-ordinal", &1u64.to_be_bytes())
833 .await
834 .unwrap();
835 blob.write_at_sync(5, vec![0xFF; 4]).await.unwrap(); }
837
838 {
840 let store =
841 Ordinal::<_, FixedBytes<32>>::init(context.child("second"), cfg.clone())
842 .await
843 .expect("Failed to initialize store");
844
845 assert!(!store.has(0)); assert!(!store.has(10)); assert!(store.has(5));
851 assert!(store.has(15));
852 assert_eq!(
853 store.get(5).await.unwrap().unwrap(),
854 FixedBytes::new([5u8; 32])
855 );
856 assert_eq!(
857 store.get(15).await.unwrap().unwrap(),
858 FixedBytes::new([15u8; 32])
859 );
860 }
861 });
862 }
863
864 #[test_traced]
865 fn test_extra_bytes_in_blob() {
866 let executor = deterministic::Runner::default();
868 executor.start(|context| async move {
869 let cfg = Config {
870 partition: "test-ordinal".into(),
871 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
872 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
873 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
874 };
875
876 {
878 let mut store =
879 Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
880 .await
881 .expect("Failed to initialize store");
882
883 store
884 .put(0, FixedBytes::new([42u8; 32]))
885 .await
886 .expect("Failed to put data");
887 store
888 .put(1, FixedBytes::new([43u8; 32]))
889 .await
890 .expect("Failed to put data");
891 store.sync().await.expect("Failed to sync store");
892 }
893
894 {
896 let (blob, size) = context
897 .open("test-ordinal", &0u64.to_be_bytes())
898 .await
899 .unwrap();
900 let mut garbage = vec![0xFF; 32]; let invalid_crc = 0xDEADBEEFu32;
904 garbage.extend_from_slice(&invalid_crc.to_be_bytes());
905 assert_eq!(garbage.len(), 36); blob.write_at_sync(size, garbage).await.unwrap();
907 }
908
909 {
911 let store =
912 Ordinal::<_, FixedBytes<32>>::init(context.child("second"), cfg.clone())
913 .await
914 .expect("Failed to initialize store");
915
916 assert!(store.has(0));
918 assert!(store.has(1));
919 assert_eq!(
920 store.get(0).await.unwrap().unwrap(),
921 FixedBytes::new([42u8; 32])
922 );
923 assert_eq!(
924 store.get(1).await.unwrap().unwrap(),
925 FixedBytes::new([43u8; 32])
926 );
927
928 let mut store_mut = store;
930 store_mut.put(2, FixedBytes::new([44u8; 32])).await.unwrap();
931 assert_eq!(
932 store_mut.get(2).await.unwrap().unwrap(),
933 FixedBytes::new([44u8; 32])
934 );
935 }
936 });
937 }
938
939 #[test_traced]
940 fn test_zero_filled_records() {
941 let executor = deterministic::Runner::default();
943 executor.start(|context| async move {
944 let cfg = Config {
945 partition: "test-ordinal".into(),
946 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
947 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
948 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
949 };
950
951 {
953 let (blob, _) = context
954 .open("test-ordinal", &0u64.to_be_bytes())
955 .await
956 .unwrap();
957
958 let zeros = vec![0u8; 36 * 5]; blob.write_at_sync(0, zeros).await.unwrap();
961
962 let mut valid_record = vec![44u8; 32];
964 let crc = Crc32::checksum(&valid_record);
965 valid_record.extend_from_slice(&crc.to_be_bytes());
966 blob.write_at_sync(36 * 5, valid_record).await.unwrap();
967 }
968
969 {
971 let store =
972 Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
973 .await
974 .expect("Failed to initialize store");
975
976 for i in 0..5 {
978 assert!(!store.has(i));
979 }
980
981 assert!(store.has(5));
983 assert_eq!(
984 store.get(5).await.unwrap().unwrap(),
985 FixedBytes::new([44u8; 32])
986 );
987 }
988 });
989 }
990
991 fn test_operations_and_restart(num_values: usize) -> String {
992 let executor = deterministic::Runner::default();
994 executor.start(|mut context| async move {
995 let cfg = Config {
996 partition: "test-ordinal".into(),
997 items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
999 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1000 };
1001
1002 let mut store =
1004 Ordinal::<_, FixedBytes<128>>::init(context.child("first"), cfg.clone())
1005 .await
1006 .expect("Failed to initialize store");
1007
1008 let mut values = Vec::new();
1010 let mut rng_index = 0u64;
1011
1012 for _ in 0..num_values {
1013 let mut index_bytes = [0u8; 8];
1015 context.fill_bytes(&mut index_bytes);
1016 let index_offset = u64::from_be_bytes(index_bytes) % 1000;
1017 let index = rng_index + index_offset;
1018 rng_index = index + 1;
1019
1020 let mut value = [0u8; 128];
1022 context.fill_bytes(&mut value);
1023 let value = FixedBytes::<128>::new(value);
1024
1025 store
1026 .put(index, value.clone())
1027 .await
1028 .expect("Failed to put data");
1029 values.push((index, value));
1030 }
1031
1032 store.sync().await.expect("Failed to sync");
1034
1035 for (index, value) in &values {
1037 let retrieved = store
1038 .get(*index)
1039 .await
1040 .expect("Failed to get data")
1041 .expect("Data not found");
1042 assert_eq!(&retrieved, value);
1043 }
1044
1045 for i in 0..10 {
1047 let _ = store.next_gap(i * 100);
1048 }
1049
1050 store.sync().await.expect("Failed to sync store");
1052 drop(store);
1053
1054 let mut store = Ordinal::<_, FixedBytes<128>>::init(context.child("second"), cfg)
1056 .await
1057 .expect("Failed to initialize store");
1058
1059 for (index, value) in &values {
1061 let retrieved = store
1062 .get(*index)
1063 .await
1064 .expect("Failed to get data")
1065 .expect("Data not found");
1066 assert_eq!(&retrieved, value);
1067 }
1068
1069 for _ in 0..10 {
1071 let mut index_bytes = [0u8; 8];
1072 context.fill_bytes(&mut index_bytes);
1073 let index = u64::from_be_bytes(index_bytes) % 10000;
1074
1075 let mut value = [0u8; 128];
1076 context.fill_bytes(&mut value);
1077 let value = FixedBytes::<128>::new(value);
1078
1079 store.put(index, value).await.expect("Failed to put data");
1080 }
1081
1082 store.sync().await.expect("Failed to sync");
1084
1085 context.auditor().state()
1087 })
1088 }
1089
1090 #[test_group("slow")]
1091 #[test_traced]
1092 fn test_determinism() {
1093 let state1 = test_operations_and_restart(100);
1094 let state2 = test_operations_and_restart(100);
1095 assert_eq!(state1, state2);
1096 }
1097
1098 #[test_traced]
1099 fn test_prune_basic() {
1100 let executor = deterministic::Runner::default();
1102 executor.start(|context| async move {
1103 let cfg = Config {
1104 partition: "test-ordinal".into(),
1105 items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1107 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1108 };
1109
1110 let mut store =
1111 Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
1112 .await
1113 .expect("Failed to initialize store");
1114
1115 let values = vec![
1117 (0u64, FixedBytes::new([0u8; 32])), (50u64, FixedBytes::new([50u8; 32])), (100u64, FixedBytes::new([100u8; 32])), (150u64, FixedBytes::new([150u8; 32])), (200u64, FixedBytes::new([200u8; 32])), (300u64, FixedBytes::new([44u8; 32])), ];
1124
1125 for (index, value) in &values {
1126 store
1127 .put(*index, value.clone())
1128 .await
1129 .expect("Failed to put data");
1130 }
1131 store.sync().await.unwrap();
1132
1133 for (index, value) in &values {
1135 assert_eq!(store.get(*index).await.unwrap().unwrap(), *value);
1136 }
1137
1138 store.prune(150).await.unwrap();
1140 let buffer = context.encode();
1141 assert!(buffer.contains("pruned_total 1"));
1142
1143 assert!(!store.has(0));
1145 assert!(!store.has(50));
1146 assert!(store.get(0).await.unwrap().is_none());
1147 assert!(store.get(50).await.unwrap().is_none());
1148
1149 assert!(store.has(100));
1151 assert!(store.has(150));
1152 assert!(store.has(200));
1153 assert!(store.has(300));
1154 assert_eq!(store.get(100).await.unwrap().unwrap(), values[2].1);
1155 assert_eq!(store.get(150).await.unwrap().unwrap(), values[3].1);
1156 assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1157 assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1158
1159 store.prune(250).await.unwrap();
1161 let buffer = context.encode();
1162 assert!(buffer.contains("pruned_total 2"));
1163
1164 assert!(!store.has(100));
1166 assert!(!store.has(150));
1167 assert!(store.get(100).await.unwrap().is_none());
1168 assert!(store.get(150).await.unwrap().is_none());
1169
1170 assert!(store.has(200));
1172 assert!(store.has(300));
1173 assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1174 assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1175 });
1176 }
1177
1178 #[test_traced]
1179 fn test_prune_with_gaps() {
1180 let executor = deterministic::Runner::default();
1182 executor.start(|context| async move {
1183 let cfg = Config {
1184 partition: "test-ordinal".into(),
1185 items_per_blob: NZU64!(100),
1186 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1187 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1188 };
1189
1190 let mut store =
1191 Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
1192 .await
1193 .expect("Failed to initialize store");
1194
1195 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1197 store.put(105, FixedBytes::new([105u8; 32])).await.unwrap();
1198 store.put(305, FixedBytes::new([49u8; 32])).await.unwrap();
1199 store.sync().await.unwrap();
1200
1201 let (current_end, next_start) = store.next_gap(0);
1203 assert!(current_end.is_none());
1204 assert_eq!(next_start, Some(5));
1205
1206 let (current_end, next_start) = store.next_gap(5);
1207 assert_eq!(current_end, Some(5));
1208 assert_eq!(next_start, Some(105));
1209
1210 store.prune(150).await.unwrap();
1212
1213 assert!(!store.has(5));
1215 assert!(store.get(5).await.unwrap().is_none());
1216
1217 assert!(store.has(105));
1219 assert!(store.has(305));
1220
1221 let (current_end, next_start) = store.next_gap(0);
1222 assert!(current_end.is_none());
1223 assert_eq!(next_start, Some(105));
1224
1225 let (current_end, next_start) = store.next_gap(105);
1226 assert_eq!(current_end, Some(105));
1227 assert_eq!(next_start, Some(305));
1228 });
1229 }
1230
1231 #[test_traced]
1232 fn test_prune_no_op() {
1233 let executor = deterministic::Runner::default();
1235 executor.start(|context| async move {
1236 let cfg = Config {
1237 partition: "test-ordinal".into(),
1238 items_per_blob: NZU64!(100),
1239 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1240 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1241 };
1242
1243 let mut store =
1244 Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
1245 .await
1246 .expect("Failed to initialize store");
1247
1248 store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1250 store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1251 store.sync().await.unwrap();
1252
1253 store.prune(50).await.unwrap();
1255
1256 assert!(store.has(100));
1258 assert!(store.has(200));
1259 let buffer = context.encode();
1260 assert!(buffer.contains("pruned_total 0"));
1261
1262 store.prune(100).await.unwrap();
1264
1265 assert!(store.has(100));
1267 assert!(store.has(200));
1268 let buffer = context.encode();
1269 assert!(buffer.contains("pruned_total 0"));
1270 });
1271 }
1272
1273 #[test_traced]
1274 fn test_prune_empty_store() {
1275 let executor = deterministic::Runner::default();
1277 executor.start(|context| async move {
1278 let cfg = Config {
1279 partition: "test-ordinal".into(),
1280 items_per_blob: NZU64!(100),
1281 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1282 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1283 };
1284
1285 let mut store =
1286 Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
1287 .await
1288 .expect("Failed to initialize store");
1289
1290 store.prune(1000).await.unwrap();
1292
1293 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1295 assert!(store.has(0));
1296 });
1297 }
1298
1299 #[test_traced]
1300 fn test_prune_after_restart() {
1301 let executor = deterministic::Runner::default();
1303 executor.start(|context| async move {
1304 let cfg = Config {
1305 partition: "test-ordinal".into(),
1306 items_per_blob: NZU64!(100),
1307 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1308 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1309 };
1310
1311 {
1313 let mut store =
1314 Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
1315 .await
1316 .expect("Failed to initialize store");
1317
1318 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1319 store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1320 store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1321 store.sync().await.unwrap();
1322 }
1323
1324 {
1326 let mut store =
1327 Ordinal::<_, FixedBytes<32>>::init(context.child("second"), cfg.clone())
1328 .await
1329 .expect("Failed to initialize store");
1330
1331 assert!(store.has(0));
1333 assert!(store.has(100));
1334 assert!(store.has(200));
1335
1336 store.prune(150).await.unwrap();
1338
1339 assert!(!store.has(0));
1341 assert!(store.has(100));
1342 assert!(store.has(200));
1343
1344 store.sync().await.unwrap();
1345 }
1346
1347 {
1349 let store = Ordinal::<_, FixedBytes<32>>::init(context.child("third"), cfg.clone())
1350 .await
1351 .expect("Failed to initialize store");
1352
1353 assert!(!store.has(0));
1354 assert!(store.has(100));
1355 assert!(store.has(200));
1356
1357 let (current_end, next_start) = store.next_gap(0);
1359 assert!(current_end.is_none());
1360 assert_eq!(next_start, Some(100));
1361 }
1362 });
1363 }
1364
1365 #[test_traced]
1366 fn test_prune_multiple_operations() {
1367 let executor = deterministic::Runner::default();
1369 executor.start(|context| async move {
1370 let cfg = Config {
1371 partition: "test-ordinal".into(),
1372 items_per_blob: NZU64!(50), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1374 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1375 };
1376
1377 let mut store =
1378 Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
1379 .await
1380 .expect("Failed to initialize store");
1381
1382 let mut values = Vec::new();
1384 for i in 0..10 {
1385 let index = i * 50 + 25; let value = FixedBytes::new([i as u8; 32]);
1387 store.put(index, value.clone()).await.unwrap();
1388 values.push((index, value));
1389 }
1390 store.sync().await.unwrap();
1391
1392 for i in 1..5 {
1394 let prune_index = i * 50 + 10;
1395 store.prune(prune_index).await.unwrap();
1396
1397 for (index, _) in &values {
1399 if *index < prune_index {
1400 assert!(!store.has(*index), "Index {index} should be pruned");
1401 } else {
1402 assert!(store.has(*index), "Index {index} should not be pruned");
1403 }
1404 }
1405 }
1406
1407 let buffer = context.encode();
1409 assert!(buffer.contains("pruned_total 4"));
1410
1411 for i in 4..10 {
1413 let index = i * 50 + 25;
1414 assert!(store.has(index));
1415 assert_eq!(
1416 store.get(index).await.unwrap().unwrap(),
1417 values[i as usize].1
1418 );
1419 }
1420 });
1421 }
1422
1423 #[test_traced]
1424 fn test_prune_blob_boundaries() {
1425 let executor = deterministic::Runner::default();
1427 executor.start(|context| async move {
1428 let cfg = Config {
1429 partition: "test-ordinal".into(),
1430 items_per_blob: NZU64!(100),
1431 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1432 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1433 };
1434
1435 let mut store =
1436 Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
1437 .await
1438 .expect("Failed to initialize store");
1439
1440 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); store.put(99, FixedBytes::new([99u8; 32])).await.unwrap(); store.put(100, FixedBytes::new([100u8; 32])).await.unwrap(); store.put(199, FixedBytes::new([199u8; 32])).await.unwrap(); store.put(200, FixedBytes::new([200u8; 32])).await.unwrap(); store.sync().await.unwrap();
1447
1448 store.prune(100).await.unwrap();
1452 assert!(!store.has(0));
1453 assert!(!store.has(99));
1454 assert!(store.has(100));
1455 assert!(store.has(199));
1456 assert!(store.has(200));
1457
1458 store.prune(199).await.unwrap();
1460 assert!(store.has(100));
1461 assert!(store.has(199));
1462 assert!(store.has(200));
1463
1464 store.prune(200).await.unwrap();
1466 assert!(!store.has(100));
1467 assert!(!store.has(199));
1468 assert!(store.has(200));
1469
1470 let buffer = context.encode();
1471 assert!(buffer.contains("pruned_total 2"));
1472 });
1473 }
1474
1475 #[test_traced]
1476 fn test_prune_non_contiguous_sections() {
1477 let executor = deterministic::Runner::default();
1479 executor.start(|context| async move {
1480 let cfg = Config {
1481 partition: "test-ordinal".into(),
1482 items_per_blob: NZU64!(100),
1483 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1484 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1485 };
1486
1487 let mut store =
1488 Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
1489 .await
1490 .expect("Failed to initialize store");
1491
1492 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); store.put(250, FixedBytes::new([50u8; 32])).await.unwrap(); store.put(500, FixedBytes::new([44u8; 32])).await.unwrap(); store.put(750, FixedBytes::new([45u8; 32])).await.unwrap(); store.sync().await.unwrap();
1498
1499 assert!(store.has(0));
1501 assert!(store.has(250));
1502 assert!(store.has(500));
1503 assert!(store.has(750));
1504
1505 store.prune(300).await.unwrap();
1507
1508 assert!(!store.has(0)); assert!(!store.has(250)); assert!(store.has(500)); assert!(store.has(750)); let buffer = context.encode();
1515 assert!(buffer.contains("pruned_total 2"));
1516
1517 store.prune(600).await.unwrap();
1519
1520 assert!(!store.has(500)); assert!(store.has(750)); let buffer = context.encode();
1525 assert!(buffer.contains("pruned_total 3"));
1526
1527 store.prune(1000).await.unwrap();
1529
1530 assert!(!store.has(750)); let buffer = context.encode();
1534 assert!(buffer.contains("pruned_total 4"));
1535 });
1536 }
1537
1538 #[test_traced]
1539 fn test_prune_removes_correct_pending() {
1540 let executor = deterministic::Runner::default();
1542 executor.start(|context| async move {
1543 let cfg = Config {
1544 partition: "test-ordinal".into(),
1545 items_per_blob: NZU64!(100),
1546 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1547 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1548 };
1549 let mut store =
1550 Ordinal::<_, FixedBytes<32>>::init(context.child("storage"), cfg.clone())
1551 .await
1552 .expect("Failed to initialize store");
1553
1554 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1556 store.sync().await.unwrap();
1557
1558 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap(); store.put(110, FixedBytes::new([110u8; 32])).await.unwrap(); assert!(store.has(5));
1564 assert!(store.has(10));
1565 assert!(store.has(110));
1566
1567 store.prune(150).await.unwrap();
1569
1570 assert!(!store.has(5));
1572 assert!(!store.has(10));
1573
1574 assert!(store.has(110));
1576 assert_eq!(
1577 store.get(110).await.unwrap().unwrap(),
1578 FixedBytes::new([110u8; 32])
1579 );
1580
1581 store.sync().await.unwrap();
1583 assert!(store.has(110));
1584 assert_eq!(
1585 store.get(110).await.unwrap().unwrap(),
1586 FixedBytes::new([110u8; 32])
1587 );
1588 });
1589 }
1590
1591 #[test_traced]
1592 fn test_init_with_bits_none() {
1593 let executor = deterministic::Runner::default();
1595 executor.start(|context| async move {
1596 let cfg = Config {
1597 partition: "test-ordinal".into(),
1598 items_per_blob: NZU64!(10), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1600 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1601 };
1602
1603 {
1605 let mut store =
1606 Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
1607 .await
1608 .expect("Failed to initialize store");
1609
1610 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1612 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1613 store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1614
1615 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1617 store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
1618
1619 store.put(25, FixedBytes::new([25u8; 32])).await.unwrap();
1621
1622 store.sync().await.unwrap();
1623 }
1624
1625 {
1627 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1628 context.child("second"),
1629 cfg.clone(),
1630 None,
1631 )
1632 .await
1633 .expect("Failed to initialize store with bits");
1634
1635 assert!(store.has(0));
1637 assert!(store.has(5));
1638 assert!(store.has(9));
1639 assert!(store.has(10));
1640 assert!(store.has(15));
1641 assert!(store.has(25));
1642
1643 assert!(!store.has(1));
1645 assert!(!store.has(11));
1646 assert!(!store.has(20));
1647
1648 assert_eq!(
1650 store.get(0).await.unwrap().unwrap(),
1651 FixedBytes::new([0u8; 32])
1652 );
1653 assert_eq!(
1654 store.get(15).await.unwrap().unwrap(),
1655 FixedBytes::new([15u8; 32])
1656 );
1657 }
1658 });
1659 }
1660
1661 #[test_traced]
1662 fn test_init_with_bits_empty_hashmap() {
1663 let executor = deterministic::Runner::default();
1665 executor.start(|context| async move {
1666 let cfg = Config {
1667 partition: "test-ordinal".into(),
1668 items_per_blob: NZU64!(10),
1669 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1670 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1671 };
1672
1673 {
1675 let mut store =
1676 Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
1677 .await
1678 .expect("Failed to initialize store");
1679
1680 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1681 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1682 store.put(20, FixedBytes::new([20u8; 32])).await.unwrap();
1683
1684 store.sync().await.unwrap();
1685 }
1686
1687 {
1689 let bits: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1690 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1691 context.child("second"),
1692 cfg.clone(),
1693 Some(bits),
1694 )
1695 .await
1696 .expect("Failed to initialize store with bits");
1697
1698 assert!(!store.has(0));
1700 assert!(!store.has(10));
1701 assert!(!store.has(20));
1702 }
1703 });
1704 }
1705
1706 #[test_traced]
1707 fn test_init_with_bits_selective_sections() {
1708 let executor = deterministic::Runner::default();
1710 executor.start(|context| async move {
1711 let cfg = Config {
1712 partition: "test-ordinal".into(),
1713 items_per_blob: NZU64!(10),
1714 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1715 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1716 };
1717
1718 {
1720 let mut store =
1721 Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
1722 .await
1723 .expect("Failed to initialize store");
1724
1725 for i in 0..10 {
1727 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1728 }
1729
1730 for i in 10..20 {
1732 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1733 }
1734
1735 for i in 20..30 {
1737 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1738 }
1739
1740 store.sync().await.unwrap();
1741 }
1742
1743 {
1745 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1746
1747 let mut bitmap = BitMap::zeroes(10);
1749 bitmap.set(2, true); bitmap.set(5, true); bitmap.set(8, true); let bitmap_option = Some(bitmap);
1753
1754 bits_map.insert(1, &bitmap_option);
1755
1756 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1757 context.child("second"),
1758 cfg.clone(),
1759 Some(bits_map),
1760 )
1761 .await
1762 .expect("Failed to initialize store with bits");
1763
1764 assert!(store.has(12));
1766 assert!(store.has(15));
1767 assert!(store.has(18));
1768
1769 assert!(!store.has(10));
1771 assert!(!store.has(11));
1772 assert!(!store.has(13));
1773 assert!(!store.has(14));
1774 assert!(!store.has(16));
1775 assert!(!store.has(17));
1776 assert!(!store.has(19));
1777
1778 for i in 0..10 {
1780 assert!(!store.has(i));
1781 }
1782 for i in 20..30 {
1783 assert!(!store.has(i));
1784 }
1785
1786 assert_eq!(
1788 store.get(12).await.unwrap().unwrap(),
1789 FixedBytes::new([12u8; 32])
1790 );
1791 assert_eq!(
1792 store.get(15).await.unwrap().unwrap(),
1793 FixedBytes::new([15u8; 32])
1794 );
1795 assert_eq!(
1796 store.get(18).await.unwrap().unwrap(),
1797 FixedBytes::new([18u8; 32])
1798 );
1799 }
1800 });
1801 }
1802
1803 #[test_traced]
1804 fn test_init_with_bits_none_option_all_records_exist() {
1805 let executor = deterministic::Runner::default();
1807 executor.start(|context| async move {
1808 let cfg = Config {
1809 partition: "test-ordinal".into(),
1810 items_per_blob: NZU64!(5),
1811 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1812 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1813 };
1814
1815 {
1817 let mut store =
1818 Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
1819 .await
1820 .expect("Failed to initialize store");
1821
1822 for i in 5..10 {
1824 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1825 }
1826
1827 store.sync().await.unwrap();
1828 }
1829
1830 {
1832 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1833 let none_option: Option<BitMap> = None;
1834 bits_map.insert(1, &none_option);
1835
1836 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1837 context.child("second"),
1838 cfg.clone(),
1839 Some(bits_map),
1840 )
1841 .await
1842 .expect("Failed to initialize store with bits");
1843
1844 for i in 5..10 {
1846 assert!(store.has(i));
1847 assert_eq!(
1848 store.get(i).await.unwrap().unwrap(),
1849 FixedBytes::new([i as u8; 32])
1850 );
1851 }
1852 }
1853 });
1854 }
1855
1856 #[test_traced]
1857 #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(6)")]
1858 fn test_init_with_bits_none_option_missing_record_panics() {
1859 let executor = deterministic::Runner::default();
1861 executor.start(|context| async move {
1862 let cfg = Config {
1863 partition: "test-ordinal".into(),
1864 items_per_blob: NZU64!(5),
1865 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1866 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1867 };
1868
1869 {
1871 let mut store =
1872 Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
1873 .await
1874 .expect("Failed to initialize store");
1875
1876 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1878 store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1880 store.put(8, FixedBytes::new([8u8; 32])).await.unwrap();
1881 store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1882
1883 store.sync().await.unwrap();
1884 }
1885
1886 {
1889 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1890 let none_option: Option<BitMap> = None;
1891 bits_map.insert(1, &none_option);
1892
1893 let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1894 context.child("second"),
1895 cfg.clone(),
1896 Some(bits_map),
1897 )
1898 .await
1899 .expect("Failed to initialize store with bits");
1900 }
1901 });
1902 }
1903
1904 #[test_traced]
1905 fn test_init_with_bits_mixed_sections() {
1906 let executor = deterministic::Runner::default();
1908 executor.start(|context| async move {
1909 let cfg = Config {
1910 partition: "test-ordinal".into(),
1911 items_per_blob: NZU64!(5),
1912 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1913 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1914 };
1915
1916 {
1918 let mut store =
1919 Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
1920 .await
1921 .expect("Failed to initialize store");
1922
1923 for i in 0..5 {
1925 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1926 }
1927
1928 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1930 store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1931 store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1932
1933 for i in 10..15 {
1935 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1936 }
1937
1938 store.sync().await.unwrap();
1939 }
1940
1941 {
1943 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1944
1945 let none_option: Option<BitMap> = None;
1947 bits_map.insert(0, &none_option);
1948
1949 let mut bitmap1 = BitMap::zeroes(5);
1951 bitmap1.set(0, true); bitmap1.set(2, true); let bitmap1_option = Some(bitmap1);
1955 bits_map.insert(1, &bitmap1_option);
1956
1957 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1960 context.child("second"),
1961 cfg.clone(),
1962 Some(bits_map),
1963 )
1964 .await
1965 .expect("Failed to initialize store with bits");
1966
1967 for i in 0..5 {
1969 assert!(store.has(i));
1970 assert_eq!(
1971 store.get(i).await.unwrap().unwrap(),
1972 FixedBytes::new([i as u8; 32])
1973 );
1974 }
1975
1976 assert!(store.has(5));
1978 assert!(store.has(7));
1979 assert!(!store.has(6));
1980 assert!(!store.has(8));
1981 assert!(!store.has(9)); for i in 10..15 {
1985 assert!(!store.has(i));
1986 }
1987 }
1988 });
1989 }
1990
1991 #[test_traced]
1992 #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(2)")]
1993 fn test_init_with_bits_corrupted_records() {
1994 let executor = deterministic::Runner::default();
1996 executor.start(|context| async move {
1997 let cfg = Config {
1998 partition: "test-ordinal".into(),
1999 items_per_blob: NZU64!(5),
2000 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
2001 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
2002 };
2003
2004 {
2006 let mut store =
2007 Ordinal::<_, FixedBytes<32>>::init(context.child("first"), cfg.clone())
2008 .await
2009 .expect("Failed to initialize store");
2010
2011 for i in 0..5 {
2013 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
2014 }
2015
2016 store.sync().await.unwrap();
2017 }
2018
2019 {
2021 let (blob, _) = context
2022 .open("test-ordinal", &0u64.to_be_bytes())
2023 .await
2024 .unwrap();
2025 let offset = 2 * 36 + 32; blob.write_at_sync(offset, vec![0xFF]).await.unwrap();
2028 }
2029
2030 {
2032 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
2033
2034 let mut bitmap = BitMap::zeroes(5);
2036 bitmap.set(0, true); bitmap.set(2, true); bitmap.set(4, true); let bitmap_option = Some(bitmap);
2040 bits_map.insert(0, &bitmap_option);
2041
2042 let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
2043 context.child("second"),
2044 cfg.clone(),
2045 Some(bits_map),
2046 )
2047 .await
2048 .expect("Failed to initialize store with bits");
2049 }
2050 });
2051 }
2052
2053 #[derive(Debug, PartialEq, Eq)]
2055 pub struct DummyValue {
2056 pub value: u64,
2057 }
2058
2059 impl Write for DummyValue {
2060 fn write(&self, buf: &mut impl BufMut) {
2061 self.value.write(buf);
2062 }
2063 }
2064
2065 impl Read for DummyValue {
2066 type Cfg = ();
2067
2068 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
2069 let value = u64::read(buf)?;
2070 if value == 0 {
2071 return Err(commonware_codec::Error::Invalid(
2072 "DummyValue",
2073 "value must be non-zero",
2074 ));
2075 }
2076 Ok(Self { value })
2077 }
2078 }
2079
2080 impl FixedSize for DummyValue {
2081 const SIZE: usize = u64::SIZE;
2082 }
2083
2084 #[test_traced]
2085 fn test_init_skip_unparseable_record() {
2086 let executor = deterministic::Runner::default();
2088 executor.start(|context| async move {
2089 let cfg = Config {
2090 partition: "test-ordinal".into(),
2091 items_per_blob: NZU64!(1),
2092 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
2093 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
2094 };
2095
2096 {
2098 let mut store = Ordinal::<_, DummyValue>::init(context.child("first"), cfg.clone())
2099 .await
2100 .expect("Failed to initialize store");
2101
2102 store.put(1, DummyValue { value: 1 }).await.unwrap();
2104 store.put(2, DummyValue { value: 0 }).await.unwrap(); store.put(4, DummyValue { value: 4 }).await.unwrap();
2106
2107 store.sync().await.unwrap();
2108 }
2109
2110 {
2112 let store = Ordinal::<_, DummyValue>::init(context.child("second"), cfg.clone())
2113 .await
2114 .expect("Failed to initialize store");
2115
2116 assert!(store.has(1), "Record 1 should be available");
2118 assert_eq!(
2119 store.get(1).await.unwrap().unwrap(),
2120 DummyValue { value: 1 },
2121 "Record 0 should have correct value"
2122 );
2123
2124 assert!(
2126 !store.has(2),
2127 "Record 2 should not be available (unparseable)"
2128 );
2129
2130 assert!(
2132 store.has(4),
2133 "Record 4 should be available - we should not exit early on unparseable record"
2134 );
2135 assert_eq!(
2136 store.get(4).await.unwrap().unwrap(),
2137 DummyValue { value: 4 },
2138 "Record 4 should have correct value"
2139 );
2140 }
2141 });
2142 }
2143}