1mod storage;
97
98use std::num::{NonZeroU64, NonZeroUsize};
99pub use storage::Ordinal;
100use thiserror::Error;
101
102#[derive(Debug, Error)]
104pub enum Error {
105 #[error("runtime error: {0}")]
106 Runtime(#[from] commonware_runtime::Error),
107 #[error("codec error: {0}")]
108 Codec(#[from] commonware_codec::Error),
109 #[error("invalid blob name: {0}")]
110 InvalidBlobName(String),
111 #[error("invalid record: {0}")]
112 InvalidRecord(u64),
113 #[error("missing record at {0}")]
114 MissingRecord(u64),
115}
116
117#[derive(Clone)]
119pub struct Config {
120 pub partition: String,
122
123 pub items_per_blob: NonZeroU64,
125
126 pub write_buffer: NonZeroUsize,
128
129 pub replay_buffer: NonZeroUsize,
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use commonware_codec::{FixedSize, Read, ReadExt, Write};
137 use commonware_cryptography::Crc32;
138 use commonware_macros::{test_group, test_traced};
139 use commonware_runtime::{deterministic, Blob, Buf, BufMut, Metrics, Runner, Storage};
140 use commonware_utils::{bitmap::BitMap, hex, sequence::FixedBytes, NZUsize, NZU64};
141 use rand::RngCore;
142 use std::collections::BTreeMap;
143
144 const DEFAULT_ITEMS_PER_BLOB: u64 = 1000;
145 const DEFAULT_WRITE_BUFFER: usize = 4096;
146 const DEFAULT_REPLAY_BUFFER: usize = 1024 * 1024;
147
148 #[test_traced]
149 fn test_put_get() {
150 let executor = deterministic::Runner::default();
152 executor.start(|context| async move {
153 let cfg = Config {
155 partition: "test-ordinal".into(),
156 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
157 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
158 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
159 };
160 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
161 .await
162 .expect("Failed to initialize store");
163
164 let value = FixedBytes::new([42u8; 32]);
165
166 assert!(!store.has(0));
168
169 store
171 .put(0, value.clone())
172 .await
173 .expect("Failed to put data");
174
175 assert!(store.has(0));
177
178 let retrieved = store
180 .get(0)
181 .await
182 .expect("Failed to get data")
183 .expect("Data not found");
184 assert_eq!(retrieved, value);
185
186 store.sync().await.expect("Failed to sync data");
188
189 let buffer = context.encode();
191 assert!(buffer.contains("gets_total 1"), "{}", buffer);
192 assert!(buffer.contains("puts_total 1"), "{}", buffer);
193 assert!(buffer.contains("has_total 2"), "{}", buffer);
194 assert!(buffer.contains("syncs_total 1"), "{}", buffer);
195 assert!(buffer.contains("pruned_total 0"), "{}", buffer);
196
197 let retrieved = store
199 .get(0)
200 .await
201 .expect("Failed to get data")
202 .expect("Data not found");
203 assert_eq!(retrieved, value);
204 });
205 }
206
207 #[test_traced]
208 fn test_concurrent_sync_does_not_report_success_while_flush_fails() {
209 let executor = deterministic::Runner::default();
210 executor.start(|context| async move {
211 let cfg = Config {
212 partition: "test-ordinal".into(),
213 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
214 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
215 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
216 };
217 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
218 .await
219 .expect("Failed to initialize store");
220
221 store
222 .put(0, FixedBytes::new([42u8; 32]))
223 .await
224 .expect("Failed to put data");
225
226 let section = 0u64.to_be_bytes();
228 context
229 .remove(&cfg.partition, Some(§ion))
230 .await
231 .expect("Failed to remove blob");
232
233 let (first, second) = futures::future::join(store.sync(), store.sync()).await;
235 assert!(first.is_err(), "first sync unexpectedly succeeded");
236 assert!(second.is_err(), "second sync unexpectedly succeeded");
237 });
238 }
239
240 #[test_traced]
241 fn test_multiple_indices() {
242 let executor = deterministic::Runner::default();
244 executor.start(|context| async move {
245 let cfg = Config {
247 partition: "test-ordinal".into(),
248 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
249 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
250 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
251 };
252 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
253 .await
254 .expect("Failed to initialize store");
255
256 let indices = vec![
258 (0u64, FixedBytes::new([0u8; 32])),
259 (5u64, FixedBytes::new([5u8; 32])),
260 (10u64, FixedBytes::new([10u8; 32])),
261 (100u64, FixedBytes::new([100u8; 32])),
262 (1000u64, FixedBytes::new([200u8; 32])), ];
264
265 for (index, value) in &indices {
266 store
267 .put(*index, value.clone())
268 .await
269 .expect("Failed to put data");
270 }
271
272 store.sync().await.expect("Failed to sync");
274
275 for (index, value) in &indices {
277 let retrieved = store
278 .get(*index)
279 .await
280 .expect("Failed to get data")
281 .expect("Data not found");
282 assert_eq!(&retrieved, value);
283 }
284 });
285 }
286
287 #[test_traced]
288 fn test_sparse_indices() {
289 let executor = deterministic::Runner::default();
291 executor.start(|context| async move {
292 let cfg = Config {
294 partition: "test-ordinal".into(),
295 items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
297 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
298 };
299 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
300 .await
301 .expect("Failed to initialize store");
302
303 let indices = vec![
305 (0u64, FixedBytes::new([0u8; 32])),
306 (99u64, FixedBytes::new([99u8; 32])), (100u64, FixedBytes::new([100u8; 32])), (500u64, FixedBytes::new([200u8; 32])), ];
310
311 for (index, value) in &indices {
312 store
313 .put(*index, value.clone())
314 .await
315 .expect("Failed to put data");
316 }
317
318 assert!(!store.has(1));
320 assert!(!store.has(50));
321 assert!(!store.has(101));
322 assert!(!store.has(499));
323
324 store.sync().await.expect("Failed to sync");
326
327 for (index, value) in &indices {
328 let retrieved = store
329 .get(*index)
330 .await
331 .expect("Failed to get data")
332 .expect("Data not found");
333 assert_eq!(&retrieved, value);
334 }
335 });
336 }
337
338 #[test_traced]
339 fn test_next_gap() {
340 let executor = deterministic::Runner::default();
342 executor.start(|context| async move {
343 let cfg = Config {
345 partition: "test-ordinal".into(),
346 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
347 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
348 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
349 };
350 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
351 .await
352 .expect("Failed to initialize store");
353
354 store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
356 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
357 store.put(11, FixedBytes::new([11u8; 32])).await.unwrap();
358 store.put(14, FixedBytes::new([14u8; 32])).await.unwrap();
359
360 let (current_end, start_next) = store.next_gap(0);
362 assert!(current_end.is_none());
363 assert_eq!(start_next, Some(1));
364
365 let (current_end, start_next) = store.next_gap(1);
366 assert_eq!(current_end, Some(1));
367 assert_eq!(start_next, Some(10));
368
369 let (current_end, start_next) = store.next_gap(10);
370 assert_eq!(current_end, Some(11));
371 assert_eq!(start_next, Some(14));
372
373 let (current_end, start_next) = store.next_gap(11);
374 assert_eq!(current_end, Some(11));
375 assert_eq!(start_next, Some(14));
376
377 let (current_end, start_next) = store.next_gap(12);
378 assert!(current_end.is_none());
379 assert_eq!(start_next, Some(14));
380
381 let (current_end, start_next) = store.next_gap(14);
382 assert_eq!(current_end, Some(14));
383 assert!(start_next.is_none());
384 });
385 }
386
387 #[test_traced]
388 fn test_missing_items() {
389 let executor = deterministic::Runner::default();
391 executor.start(|context| async move {
392 let cfg = Config {
394 partition: "test-ordinal".into(),
395 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
396 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
397 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
398 };
399 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
400 .await
401 .expect("Failed to initialize store");
402
403 assert_eq!(store.missing_items(0, 5), Vec::<u64>::new());
405 assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
406
407 store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
409 store.put(2, FixedBytes::new([2u8; 32])).await.unwrap();
410 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
411 store.put(6, FixedBytes::new([6u8; 32])).await.unwrap();
412 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
413
414 assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
416 assert_eq!(store.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
417 assert_eq!(store.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
418
419 assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
421 assert_eq!(store.missing_items(4, 2), vec![4, 7]);
422
423 assert_eq!(store.missing_items(1, 3), vec![3, 4, 7]);
425 assert_eq!(store.missing_items(2, 4), vec![3, 4, 7, 8]);
426 assert_eq!(store.missing_items(5, 2), vec![7, 8]);
427
428 assert_eq!(store.missing_items(11, 5), Vec::<u64>::new());
430 assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
431
432 store.put(1000, FixedBytes::new([100u8; 32])).await.unwrap();
434
435 let items = store.missing_items(11, 10);
437 assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
438
439 let items = store.missing_items(990, 15);
441 assert_eq!(
442 items,
443 vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
444 );
445
446 store.sync().await.unwrap();
448 assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
449 assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
450
451 store.put(9999, FixedBytes::new([99u8; 32])).await.unwrap();
453 store
454 .put(10001, FixedBytes::new([101u8; 32]))
455 .await
456 .unwrap();
457
458 let items = store.missing_items(9998, 5);
460 assert_eq!(items, vec![9998, 10000]);
461 });
462 }
463
464 #[test_traced]
465 fn test_restart() {
466 let executor = deterministic::Runner::default();
468 executor.start(|context| async move {
469 let cfg = Config {
470 partition: "test-ordinal".into(),
471 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
472 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
473 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
474 };
475
476 {
478 let mut store =
479 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
480 .await
481 .expect("Failed to initialize store");
482
483 let values = vec![
484 (0u64, FixedBytes::new([0u8; 32])),
485 (100u64, FixedBytes::new([100u8; 32])),
486 (1000u64, FixedBytes::new([200u8; 32])),
487 ];
488
489 for (index, value) in &values {
490 store
491 .put(*index, value.clone())
492 .await
493 .expect("Failed to put data");
494 }
495
496 store.sync().await.expect("Failed to sync store");
497 }
498
499 {
501 let store =
502 Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
503 .await
504 .expect("Failed to initialize store");
505
506 let values = vec![
507 (0u64, FixedBytes::new([0u8; 32])),
508 (100u64, FixedBytes::new([100u8; 32])),
509 (1000u64, FixedBytes::new([200u8; 32])),
510 ];
511
512 for (index, value) in &values {
513 let retrieved = store
514 .get(*index)
515 .await
516 .expect("Failed to get data")
517 .expect("Data not found");
518 assert_eq!(&retrieved, value);
519 }
520
521 let (current_end, start_next) = store.next_gap(0);
523 assert_eq!(current_end, Some(0));
524 assert_eq!(start_next, Some(100));
525 }
526 });
527 }
528
529 #[test_traced]
530 fn test_invalid_record() {
531 let executor = deterministic::Runner::default();
533 executor.start(|context| async move {
534 let cfg = Config {
535 partition: "test-ordinal".into(),
536 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
537 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
538 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
539 };
540
541 {
543 let mut store =
544 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
545 .await
546 .expect("Failed to initialize store");
547
548 store
549 .put(0, FixedBytes::new([42u8; 32]))
550 .await
551 .expect("Failed to put data");
552 store.sync().await.expect("Failed to sync store");
553 }
554
555 {
557 let (blob, _) = context
558 .open("test-ordinal", &0u64.to_be_bytes())
559 .await
560 .unwrap();
561 blob.write_at(32, vec![0xFF]).await.unwrap();
563 blob.sync().await.unwrap();
564 }
565
566 {
568 let store =
569 Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
570 .await
571 .expect("Failed to initialize store");
572
573 let result = store.get(0).await.unwrap();
575 assert!(result.is_none());
576
577 assert!(!store.has(0));
579 }
580 });
581 }
582
583 #[test_traced]
584 fn test_get_nonexistent() {
585 let executor = deterministic::Runner::default();
587 executor.start(|context| async move {
588 let cfg = Config {
590 partition: "test-ordinal".into(),
591 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
592 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
593 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
594 };
595 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
596 .await
597 .expect("Failed to initialize store");
598
599 let retrieved = store.get(999).await.expect("Failed to get data");
601 assert!(retrieved.is_none());
602
603 assert!(!store.has(999));
605 });
606 }
607
608 #[test_traced]
609 fn test_destroy() {
610 let executor = deterministic::Runner::default();
612 executor.start(|context| async move {
613 let cfg = Config {
614 partition: "test-ordinal".into(),
615 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
616 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
617 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
618 };
619
620 {
622 let mut store =
623 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
624 .await
625 .expect("Failed to initialize store");
626
627 store
628 .put(0, FixedBytes::new([0u8; 32]))
629 .await
630 .expect("Failed to put data");
631 store
632 .put(1000, FixedBytes::new([100u8; 32]))
633 .await
634 .expect("Failed to put data");
635
636 store.destroy().await.expect("Failed to destroy store");
638 }
639
640 {
642 let store =
643 Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
644 .await
645 .expect("Failed to initialize store");
646
647 assert!(store.get(0).await.unwrap().is_none());
649 assert!(store.get(1000).await.unwrap().is_none());
650 assert!(!store.has(0));
651 assert!(!store.has(1000));
652 }
653 });
654 }
655
656 #[test_traced]
657 fn test_partial_record_write() {
658 let executor = deterministic::Runner::default();
660 executor.start(|context| async move {
661 let cfg = Config {
662 partition: "test-ordinal".into(),
663 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
664 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
665 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
666 };
667
668 {
670 let mut store =
671 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
672 .await
673 .expect("Failed to initialize store");
674
675 store
676 .put(0, FixedBytes::new([42u8; 32]))
677 .await
678 .expect("Failed to put data");
679 store
680 .put(1, FixedBytes::new([43u8; 32]))
681 .await
682 .expect("Failed to put data");
683 store.sync().await.expect("Failed to sync store");
684 }
685
686 {
688 let (blob, _) = context
689 .open("test-ordinal", &0u64.to_be_bytes())
690 .await
691 .unwrap();
692 blob.write_at(36, vec![0xFF; 32]).await.unwrap();
694 blob.sync().await.unwrap();
695 }
696
697 {
699 let store =
700 Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
701 .await
702 .expect("Failed to initialize store");
703
704 assert_eq!(
706 store.get(0).await.unwrap().unwrap(),
707 FixedBytes::new([42u8; 32])
708 );
709
710 assert!(!store.has(1));
712 assert!(store.get(1).await.unwrap().is_none());
713
714 let mut store_mut = store;
716 store_mut.put(1, FixedBytes::new([44u8; 32])).await.unwrap();
717 assert_eq!(
718 store_mut.get(1).await.unwrap().unwrap(),
719 FixedBytes::new([44u8; 32])
720 );
721 }
722 });
723 }
724
725 #[test_traced]
726 fn test_corrupted_value() {
727 let executor = deterministic::Runner::default();
729 executor.start(|context| async move {
730 let cfg = Config {
731 partition: "test-ordinal".into(),
732 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
733 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
734 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
735 };
736
737 {
739 let mut store =
740 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
741 .await
742 .expect("Failed to initialize store");
743
744 store
745 .put(0, FixedBytes::new([42u8; 32]))
746 .await
747 .expect("Failed to put data");
748 store
749 .put(1, FixedBytes::new([43u8; 32]))
750 .await
751 .expect("Failed to put data");
752 store.sync().await.expect("Failed to sync store");
753 }
754
755 {
757 let (blob, _) = context
758 .open("test-ordinal", &0u64.to_be_bytes())
759 .await
760 .unwrap();
761 blob.write_at(10, hex!("0xFFFFFFFF").to_vec())
763 .await
764 .unwrap();
765 blob.sync().await.unwrap();
766 }
767
768 {
770 let store =
771 Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
772 .await
773 .expect("Failed to initialize store");
774
775 assert!(!store.has(0));
777
778 assert!(store.has(1));
780 assert_eq!(
781 store.get(1).await.unwrap().unwrap(),
782 FixedBytes::new([43u8; 32])
783 );
784 }
785 });
786 }
787
788 #[test_traced]
789 fn test_crc_corruptions() {
790 let executor = deterministic::Runner::default();
792 executor.start(|context| async move {
793 let cfg = Config {
794 partition: "test-ordinal".into(),
795 items_per_blob: NZU64!(10), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
797 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
798 };
799
800 {
802 let mut store =
803 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
804 .await
805 .expect("Failed to initialize store");
806
807 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
809 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
810 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
811 store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
812 store.sync().await.expect("Failed to sync store");
813 }
814
815 {
817 let (blob, _) = context
819 .open("test-ordinal", &0u64.to_be_bytes())
820 .await
821 .unwrap();
822 blob.write_at(32, vec![0xFF]).await.unwrap(); blob.sync().await.unwrap();
824
825 let (blob, _) = context
827 .open("test-ordinal", &1u64.to_be_bytes())
828 .await
829 .unwrap();
830 blob.write_at(5, vec![0xFF; 4]).await.unwrap(); blob.sync().await.unwrap();
832 }
833
834 {
836 let store =
837 Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
838 .await
839 .expect("Failed to initialize store");
840
841 assert!(!store.has(0)); assert!(!store.has(10)); assert!(store.has(5));
847 assert!(store.has(15));
848 assert_eq!(
849 store.get(5).await.unwrap().unwrap(),
850 FixedBytes::new([5u8; 32])
851 );
852 assert_eq!(
853 store.get(15).await.unwrap().unwrap(),
854 FixedBytes::new([15u8; 32])
855 );
856 }
857 });
858 }
859
860 #[test_traced]
861 fn test_extra_bytes_in_blob() {
862 let executor = deterministic::Runner::default();
864 executor.start(|context| async move {
865 let cfg = Config {
866 partition: "test-ordinal".into(),
867 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
868 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
869 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
870 };
871
872 {
874 let mut store =
875 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
876 .await
877 .expect("Failed to initialize store");
878
879 store
880 .put(0, FixedBytes::new([42u8; 32]))
881 .await
882 .expect("Failed to put data");
883 store
884 .put(1, FixedBytes::new([43u8; 32]))
885 .await
886 .expect("Failed to put data");
887 store.sync().await.expect("Failed to sync store");
888 }
889
890 {
892 let (blob, size) = context
893 .open("test-ordinal", &0u64.to_be_bytes())
894 .await
895 .unwrap();
896 let mut garbage = vec![0xFF; 32]; let invalid_crc = 0xDEADBEEFu32;
900 garbage.extend_from_slice(&invalid_crc.to_be_bytes());
901 assert_eq!(garbage.len(), 36); blob.write_at(size, garbage).await.unwrap();
903 blob.sync().await.unwrap();
904 }
905
906 {
908 let store =
909 Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
910 .await
911 .expect("Failed to initialize store");
912
913 assert!(store.has(0));
915 assert!(store.has(1));
916 assert_eq!(
917 store.get(0).await.unwrap().unwrap(),
918 FixedBytes::new([42u8; 32])
919 );
920 assert_eq!(
921 store.get(1).await.unwrap().unwrap(),
922 FixedBytes::new([43u8; 32])
923 );
924
925 let mut store_mut = store;
927 store_mut.put(2, FixedBytes::new([44u8; 32])).await.unwrap();
928 assert_eq!(
929 store_mut.get(2).await.unwrap().unwrap(),
930 FixedBytes::new([44u8; 32])
931 );
932 }
933 });
934 }
935
936 #[test_traced]
937 fn test_zero_filled_records() {
938 let executor = deterministic::Runner::default();
940 executor.start(|context| async move {
941 let cfg = Config {
942 partition: "test-ordinal".into(),
943 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
944 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
945 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
946 };
947
948 {
950 let (blob, _) = context
951 .open("test-ordinal", &0u64.to_be_bytes())
952 .await
953 .unwrap();
954
955 let zeros = vec![0u8; 36 * 5]; blob.write_at(0, zeros).await.unwrap();
958
959 let mut valid_record = vec![44u8; 32];
961 let crc = Crc32::checksum(&valid_record);
962 valid_record.extend_from_slice(&crc.to_be_bytes());
963 blob.write_at(36 * 5, valid_record).await.unwrap();
964
965 blob.sync().await.unwrap();
966 }
967
968 {
970 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
971 .await
972 .expect("Failed to initialize store");
973
974 for i in 0..5 {
976 assert!(!store.has(i));
977 }
978
979 assert!(store.has(5));
981 assert_eq!(
982 store.get(5).await.unwrap().unwrap(),
983 FixedBytes::new([44u8; 32])
984 );
985 }
986 });
987 }
988
989 fn test_operations_and_restart(num_values: usize) -> String {
990 let executor = deterministic::Runner::default();
992 executor.start(|mut context| async move {
993 let cfg = Config {
994 partition: "test-ordinal".into(),
995 items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
997 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
998 };
999
1000 let mut store =
1002 Ordinal::<_, FixedBytes<128>>::init(context.with_label("first"), cfg.clone())
1003 .await
1004 .expect("Failed to initialize store");
1005
1006 let mut values = Vec::new();
1008 let mut rng_index = 0u64;
1009
1010 for _ in 0..num_values {
1011 let mut index_bytes = [0u8; 8];
1013 context.fill_bytes(&mut index_bytes);
1014 let index_offset = u64::from_be_bytes(index_bytes) % 1000;
1015 let index = rng_index + index_offset;
1016 rng_index = index + 1;
1017
1018 let mut value = [0u8; 128];
1020 context.fill_bytes(&mut value);
1021 let value = FixedBytes::<128>::new(value);
1022
1023 store
1024 .put(index, value.clone())
1025 .await
1026 .expect("Failed to put data");
1027 values.push((index, value));
1028 }
1029
1030 store.sync().await.expect("Failed to sync");
1032
1033 for (index, value) in &values {
1035 let retrieved = store
1036 .get(*index)
1037 .await
1038 .expect("Failed to get data")
1039 .expect("Data not found");
1040 assert_eq!(&retrieved, value);
1041 }
1042
1043 for i in 0..10 {
1045 let _ = store.next_gap(i * 100);
1046 }
1047
1048 store.sync().await.expect("Failed to sync store");
1050 drop(store);
1051
1052 let mut store = Ordinal::<_, FixedBytes<128>>::init(context.with_label("second"), cfg)
1054 .await
1055 .expect("Failed to initialize store");
1056
1057 for (index, value) in &values {
1059 let retrieved = store
1060 .get(*index)
1061 .await
1062 .expect("Failed to get data")
1063 .expect("Data not found");
1064 assert_eq!(&retrieved, value);
1065 }
1066
1067 for _ in 0..10 {
1069 let mut index_bytes = [0u8; 8];
1070 context.fill_bytes(&mut index_bytes);
1071 let index = u64::from_be_bytes(index_bytes) % 10000;
1072
1073 let mut value = [0u8; 128];
1074 context.fill_bytes(&mut value);
1075 let value = FixedBytes::<128>::new(value);
1076
1077 store.put(index, value).await.expect("Failed to put data");
1078 }
1079
1080 store.sync().await.expect("Failed to sync");
1082
1083 context.auditor().state()
1085 })
1086 }
1087
1088 #[test_group("slow")]
1089 #[test_traced]
1090 fn test_determinism() {
1091 let state1 = test_operations_and_restart(100);
1092 let state2 = test_operations_and_restart(100);
1093 assert_eq!(state1, state2);
1094 }
1095
1096 #[test_traced]
1097 fn test_prune_basic() {
1098 let executor = deterministic::Runner::default();
1100 executor.start(|context| async move {
1101 let cfg = Config {
1102 partition: "test-ordinal".into(),
1103 items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1105 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1106 };
1107
1108 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1109 .await
1110 .expect("Failed to initialize store");
1111
1112 let values = vec![
1114 (0u64, FixedBytes::new([0u8; 32])), (50u64, FixedBytes::new([50u8; 32])), (100u64, FixedBytes::new([100u8; 32])), (150u64, FixedBytes::new([150u8; 32])), (200u64, FixedBytes::new([200u8; 32])), (300u64, FixedBytes::new([44u8; 32])), ];
1121
1122 for (index, value) in &values {
1123 store
1124 .put(*index, value.clone())
1125 .await
1126 .expect("Failed to put data");
1127 }
1128 store.sync().await.unwrap();
1129
1130 for (index, value) in &values {
1132 assert_eq!(store.get(*index).await.unwrap().unwrap(), *value);
1133 }
1134
1135 store.prune(150).await.unwrap();
1137 let buffer = context.encode();
1138 assert!(buffer.contains("pruned_total 1"));
1139
1140 assert!(!store.has(0));
1142 assert!(!store.has(50));
1143 assert!(store.get(0).await.unwrap().is_none());
1144 assert!(store.get(50).await.unwrap().is_none());
1145
1146 assert!(store.has(100));
1148 assert!(store.has(150));
1149 assert!(store.has(200));
1150 assert!(store.has(300));
1151 assert_eq!(store.get(100).await.unwrap().unwrap(), values[2].1);
1152 assert_eq!(store.get(150).await.unwrap().unwrap(), values[3].1);
1153 assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1154 assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1155
1156 store.prune(250).await.unwrap();
1158 let buffer = context.encode();
1159 assert!(buffer.contains("pruned_total 2"));
1160
1161 assert!(!store.has(100));
1163 assert!(!store.has(150));
1164 assert!(store.get(100).await.unwrap().is_none());
1165 assert!(store.get(150).await.unwrap().is_none());
1166
1167 assert!(store.has(200));
1169 assert!(store.has(300));
1170 assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1171 assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1172 });
1173 }
1174
1175 #[test_traced]
1176 fn test_prune_with_gaps() {
1177 let executor = deterministic::Runner::default();
1179 executor.start(|context| async move {
1180 let cfg = Config {
1181 partition: "test-ordinal".into(),
1182 items_per_blob: NZU64!(100),
1183 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1184 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1185 };
1186
1187 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1188 .await
1189 .expect("Failed to initialize store");
1190
1191 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1193 store.put(105, FixedBytes::new([105u8; 32])).await.unwrap();
1194 store.put(305, FixedBytes::new([49u8; 32])).await.unwrap();
1195 store.sync().await.unwrap();
1196
1197 let (current_end, next_start) = store.next_gap(0);
1199 assert!(current_end.is_none());
1200 assert_eq!(next_start, Some(5));
1201
1202 let (current_end, next_start) = store.next_gap(5);
1203 assert_eq!(current_end, Some(5));
1204 assert_eq!(next_start, Some(105));
1205
1206 store.prune(150).await.unwrap();
1208
1209 assert!(!store.has(5));
1211 assert!(store.get(5).await.unwrap().is_none());
1212
1213 assert!(store.has(105));
1215 assert!(store.has(305));
1216
1217 let (current_end, next_start) = store.next_gap(0);
1218 assert!(current_end.is_none());
1219 assert_eq!(next_start, Some(105));
1220
1221 let (current_end, next_start) = store.next_gap(105);
1222 assert_eq!(current_end, Some(105));
1223 assert_eq!(next_start, Some(305));
1224 });
1225 }
1226
1227 #[test_traced]
1228 fn test_prune_no_op() {
1229 let executor = deterministic::Runner::default();
1231 executor.start(|context| async move {
1232 let cfg = Config {
1233 partition: "test-ordinal".into(),
1234 items_per_blob: NZU64!(100),
1235 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1236 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1237 };
1238
1239 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1240 .await
1241 .expect("Failed to initialize store");
1242
1243 store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1245 store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1246 store.sync().await.unwrap();
1247
1248 store.prune(50).await.unwrap();
1250
1251 assert!(store.has(100));
1253 assert!(store.has(200));
1254 let buffer = context.encode();
1255 assert!(buffer.contains("pruned_total 0"));
1256
1257 store.prune(100).await.unwrap();
1259
1260 assert!(store.has(100));
1262 assert!(store.has(200));
1263 let buffer = context.encode();
1264 assert!(buffer.contains("pruned_total 0"));
1265 });
1266 }
1267
1268 #[test_traced]
1269 fn test_prune_empty_store() {
1270 let executor = deterministic::Runner::default();
1272 executor.start(|context| async move {
1273 let cfg = Config {
1274 partition: "test-ordinal".into(),
1275 items_per_blob: NZU64!(100),
1276 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1277 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1278 };
1279
1280 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1281 .await
1282 .expect("Failed to initialize store");
1283
1284 store.prune(1000).await.unwrap();
1286
1287 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1289 assert!(store.has(0));
1290 });
1291 }
1292
1293 #[test_traced]
1294 fn test_prune_after_restart() {
1295 let executor = deterministic::Runner::default();
1297 executor.start(|context| async move {
1298 let cfg = Config {
1299 partition: "test-ordinal".into(),
1300 items_per_blob: NZU64!(100),
1301 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1302 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1303 };
1304
1305 {
1307 let mut store =
1308 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1309 .await
1310 .expect("Failed to initialize store");
1311
1312 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1313 store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1314 store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1315 store.sync().await.unwrap();
1316 }
1317
1318 {
1320 let mut store =
1321 Ordinal::<_, FixedBytes<32>>::init(context.with_label("second"), cfg.clone())
1322 .await
1323 .expect("Failed to initialize store");
1324
1325 assert!(store.has(0));
1327 assert!(store.has(100));
1328 assert!(store.has(200));
1329
1330 store.prune(150).await.unwrap();
1332
1333 assert!(!store.has(0));
1335 assert!(store.has(100));
1336 assert!(store.has(200));
1337
1338 store.sync().await.unwrap();
1339 }
1340
1341 {
1343 let store =
1344 Ordinal::<_, FixedBytes<32>>::init(context.with_label("third"), cfg.clone())
1345 .await
1346 .expect("Failed to initialize store");
1347
1348 assert!(!store.has(0));
1349 assert!(store.has(100));
1350 assert!(store.has(200));
1351
1352 let (current_end, next_start) = store.next_gap(0);
1354 assert!(current_end.is_none());
1355 assert_eq!(next_start, Some(100));
1356 }
1357 });
1358 }
1359
1360 #[test_traced]
1361 fn test_prune_multiple_operations() {
1362 let executor = deterministic::Runner::default();
1364 executor.start(|context| async move {
1365 let cfg = Config {
1366 partition: "test-ordinal".into(),
1367 items_per_blob: NZU64!(50), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1369 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1370 };
1371
1372 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1373 .await
1374 .expect("Failed to initialize store");
1375
1376 let mut values = Vec::new();
1378 for i in 0..10 {
1379 let index = i * 50 + 25; let value = FixedBytes::new([i as u8; 32]);
1381 store.put(index, value.clone()).await.unwrap();
1382 values.push((index, value));
1383 }
1384 store.sync().await.unwrap();
1385
1386 for i in 1..5 {
1388 let prune_index = i * 50 + 10;
1389 store.prune(prune_index).await.unwrap();
1390
1391 for (index, _) in &values {
1393 if *index < prune_index {
1394 assert!(!store.has(*index), "Index {index} should be pruned");
1395 } else {
1396 assert!(store.has(*index), "Index {index} should not be pruned");
1397 }
1398 }
1399 }
1400
1401 let buffer = context.encode();
1403 assert!(buffer.contains("pruned_total 4"));
1404
1405 for i in 4..10 {
1407 let index = i * 50 + 25;
1408 assert!(store.has(index));
1409 assert_eq!(
1410 store.get(index).await.unwrap().unwrap(),
1411 values[i as usize].1
1412 );
1413 }
1414 });
1415 }
1416
1417 #[test_traced]
1418 fn test_prune_blob_boundaries() {
1419 let executor = deterministic::Runner::default();
1421 executor.start(|context| async move {
1422 let cfg = Config {
1423 partition: "test-ordinal".into(),
1424 items_per_blob: NZU64!(100),
1425 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1426 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1427 };
1428
1429 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1430 .await
1431 .expect("Failed to initialize store");
1432
1433 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); store.put(99, FixedBytes::new([99u8; 32])).await.unwrap(); store.put(100, FixedBytes::new([100u8; 32])).await.unwrap(); store.put(199, FixedBytes::new([199u8; 32])).await.unwrap(); store.put(200, FixedBytes::new([200u8; 32])).await.unwrap(); store.sync().await.unwrap();
1440
1441 store.prune(100).await.unwrap();
1445 assert!(!store.has(0));
1446 assert!(!store.has(99));
1447 assert!(store.has(100));
1448 assert!(store.has(199));
1449 assert!(store.has(200));
1450
1451 store.prune(199).await.unwrap();
1453 assert!(store.has(100));
1454 assert!(store.has(199));
1455 assert!(store.has(200));
1456
1457 store.prune(200).await.unwrap();
1459 assert!(!store.has(100));
1460 assert!(!store.has(199));
1461 assert!(store.has(200));
1462
1463 let buffer = context.encode();
1464 assert!(buffer.contains("pruned_total 2"));
1465 });
1466 }
1467
1468 #[test_traced]
1469 fn test_prune_non_contiguous_sections() {
1470 let executor = deterministic::Runner::default();
1472 executor.start(|context| async move {
1473 let cfg = Config {
1474 partition: "test-ordinal".into(),
1475 items_per_blob: NZU64!(100),
1476 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1477 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1478 };
1479
1480 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1481 .await
1482 .expect("Failed to initialize store");
1483
1484 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); store.put(250, FixedBytes::new([50u8; 32])).await.unwrap(); store.put(500, FixedBytes::new([44u8; 32])).await.unwrap(); store.put(750, FixedBytes::new([45u8; 32])).await.unwrap(); store.sync().await.unwrap();
1490
1491 assert!(store.has(0));
1493 assert!(store.has(250));
1494 assert!(store.has(500));
1495 assert!(store.has(750));
1496
1497 store.prune(300).await.unwrap();
1499
1500 assert!(!store.has(0)); assert!(!store.has(250)); assert!(store.has(500)); assert!(store.has(750)); let buffer = context.encode();
1507 assert!(buffer.contains("pruned_total 2"));
1508
1509 store.prune(600).await.unwrap();
1511
1512 assert!(!store.has(500)); assert!(store.has(750)); let buffer = context.encode();
1517 assert!(buffer.contains("pruned_total 3"));
1518
1519 store.prune(1000).await.unwrap();
1521
1522 assert!(!store.has(750)); let buffer = context.encode();
1526 assert!(buffer.contains("pruned_total 4"));
1527 });
1528 }
1529
1530 #[test_traced]
1531 fn test_prune_removes_correct_pending() {
1532 let executor = deterministic::Runner::default();
1534 executor.start(|context| async move {
1535 let cfg = Config {
1536 partition: "test-ordinal".into(),
1537 items_per_blob: NZU64!(100),
1538 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1539 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1540 };
1541 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1542 .await
1543 .expect("Failed to initialize store");
1544
1545 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1547 store.sync().await.unwrap();
1548
1549 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap(); store.put(110, FixedBytes::new([110u8; 32])).await.unwrap(); assert!(store.has(5));
1555 assert!(store.has(10));
1556 assert!(store.has(110));
1557
1558 store.prune(150).await.unwrap();
1560
1561 assert!(!store.has(5));
1563 assert!(!store.has(10));
1564
1565 assert!(store.has(110));
1567 assert_eq!(
1568 store.get(110).await.unwrap().unwrap(),
1569 FixedBytes::new([110u8; 32])
1570 );
1571
1572 store.sync().await.unwrap();
1574 assert!(store.has(110));
1575 assert_eq!(
1576 store.get(110).await.unwrap().unwrap(),
1577 FixedBytes::new([110u8; 32])
1578 );
1579 });
1580 }
1581
1582 #[test_traced]
1583 fn test_init_with_bits_none() {
1584 let executor = deterministic::Runner::default();
1586 executor.start(|context| async move {
1587 let cfg = Config {
1588 partition: "test-ordinal".into(),
1589 items_per_blob: NZU64!(10), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1591 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1592 };
1593
1594 {
1596 let mut store =
1597 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1598 .await
1599 .expect("Failed to initialize store");
1600
1601 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1603 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1604 store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1605
1606 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1608 store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
1609
1610 store.put(25, FixedBytes::new([25u8; 32])).await.unwrap();
1612
1613 store.sync().await.unwrap();
1614 }
1615
1616 {
1618 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1619 context.with_label("second"),
1620 cfg.clone(),
1621 None,
1622 )
1623 .await
1624 .expect("Failed to initialize store with bits");
1625
1626 assert!(store.has(0));
1628 assert!(store.has(5));
1629 assert!(store.has(9));
1630 assert!(store.has(10));
1631 assert!(store.has(15));
1632 assert!(store.has(25));
1633
1634 assert!(!store.has(1));
1636 assert!(!store.has(11));
1637 assert!(!store.has(20));
1638
1639 assert_eq!(
1641 store.get(0).await.unwrap().unwrap(),
1642 FixedBytes::new([0u8; 32])
1643 );
1644 assert_eq!(
1645 store.get(15).await.unwrap().unwrap(),
1646 FixedBytes::new([15u8; 32])
1647 );
1648 }
1649 });
1650 }
1651
1652 #[test_traced]
1653 fn test_init_with_bits_empty_hashmap() {
1654 let executor = deterministic::Runner::default();
1656 executor.start(|context| async move {
1657 let cfg = Config {
1658 partition: "test-ordinal".into(),
1659 items_per_blob: NZU64!(10),
1660 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1661 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1662 };
1663
1664 {
1666 let mut store =
1667 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1668 .await
1669 .expect("Failed to initialize store");
1670
1671 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1672 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1673 store.put(20, FixedBytes::new([20u8; 32])).await.unwrap();
1674
1675 store.sync().await.unwrap();
1676 }
1677
1678 {
1680 let bits: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1681 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1682 context.with_label("second"),
1683 cfg.clone(),
1684 Some(bits),
1685 )
1686 .await
1687 .expect("Failed to initialize store with bits");
1688
1689 assert!(!store.has(0));
1691 assert!(!store.has(10));
1692 assert!(!store.has(20));
1693 }
1694 });
1695 }
1696
1697 #[test_traced]
1698 fn test_init_with_bits_selective_sections() {
1699 let executor = deterministic::Runner::default();
1701 executor.start(|context| async move {
1702 let cfg = Config {
1703 partition: "test-ordinal".into(),
1704 items_per_blob: NZU64!(10),
1705 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1706 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1707 };
1708
1709 {
1711 let mut store =
1712 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1713 .await
1714 .expect("Failed to initialize store");
1715
1716 for i in 0..10 {
1718 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1719 }
1720
1721 for i in 10..20 {
1723 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1724 }
1725
1726 for i in 20..30 {
1728 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1729 }
1730
1731 store.sync().await.unwrap();
1732 }
1733
1734 {
1736 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1737
1738 let mut bitmap = BitMap::zeroes(10);
1740 bitmap.set(2, true); bitmap.set(5, true); bitmap.set(8, true); let bitmap_option = Some(bitmap);
1744
1745 bits_map.insert(1, &bitmap_option);
1746
1747 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1748 context.with_label("second"),
1749 cfg.clone(),
1750 Some(bits_map),
1751 )
1752 .await
1753 .expect("Failed to initialize store with bits");
1754
1755 assert!(store.has(12));
1757 assert!(store.has(15));
1758 assert!(store.has(18));
1759
1760 assert!(!store.has(10));
1762 assert!(!store.has(11));
1763 assert!(!store.has(13));
1764 assert!(!store.has(14));
1765 assert!(!store.has(16));
1766 assert!(!store.has(17));
1767 assert!(!store.has(19));
1768
1769 for i in 0..10 {
1771 assert!(!store.has(i));
1772 }
1773 for i in 20..30 {
1774 assert!(!store.has(i));
1775 }
1776
1777 assert_eq!(
1779 store.get(12).await.unwrap().unwrap(),
1780 FixedBytes::new([12u8; 32])
1781 );
1782 assert_eq!(
1783 store.get(15).await.unwrap().unwrap(),
1784 FixedBytes::new([15u8; 32])
1785 );
1786 assert_eq!(
1787 store.get(18).await.unwrap().unwrap(),
1788 FixedBytes::new([18u8; 32])
1789 );
1790 }
1791 });
1792 }
1793
1794 #[test_traced]
1795 fn test_init_with_bits_none_option_all_records_exist() {
1796 let executor = deterministic::Runner::default();
1798 executor.start(|context| async move {
1799 let cfg = Config {
1800 partition: "test-ordinal".into(),
1801 items_per_blob: NZU64!(5),
1802 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1803 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1804 };
1805
1806 {
1808 let mut store =
1809 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1810 .await
1811 .expect("Failed to initialize store");
1812
1813 for i in 5..10 {
1815 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1816 }
1817
1818 store.sync().await.unwrap();
1819 }
1820
1821 {
1823 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1824 let none_option: Option<BitMap> = None;
1825 bits_map.insert(1, &none_option);
1826
1827 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1828 context.with_label("second"),
1829 cfg.clone(),
1830 Some(bits_map),
1831 )
1832 .await
1833 .expect("Failed to initialize store with bits");
1834
1835 for i in 5..10 {
1837 assert!(store.has(i));
1838 assert_eq!(
1839 store.get(i).await.unwrap().unwrap(),
1840 FixedBytes::new([i as u8; 32])
1841 );
1842 }
1843 }
1844 });
1845 }
1846
1847 #[test_traced]
1848 #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(6)")]
1849 fn test_init_with_bits_none_option_missing_record_panics() {
1850 let executor = deterministic::Runner::default();
1852 executor.start(|context| async move {
1853 let cfg = Config {
1854 partition: "test-ordinal".into(),
1855 items_per_blob: NZU64!(5),
1856 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1857 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1858 };
1859
1860 {
1862 let mut store =
1863 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1864 .await
1865 .expect("Failed to initialize store");
1866
1867 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1869 store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1871 store.put(8, FixedBytes::new([8u8; 32])).await.unwrap();
1872 store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1873
1874 store.sync().await.unwrap();
1875 }
1876
1877 {
1880 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1881 let none_option: Option<BitMap> = None;
1882 bits_map.insert(1, &none_option);
1883
1884 let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1885 context.with_label("second"),
1886 cfg.clone(),
1887 Some(bits_map),
1888 )
1889 .await
1890 .expect("Failed to initialize store with bits");
1891 }
1892 });
1893 }
1894
1895 #[test_traced]
1896 fn test_init_with_bits_mixed_sections() {
1897 let executor = deterministic::Runner::default();
1899 executor.start(|context| async move {
1900 let cfg = Config {
1901 partition: "test-ordinal".into(),
1902 items_per_blob: NZU64!(5),
1903 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1904 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1905 };
1906
1907 {
1909 let mut store =
1910 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1911 .await
1912 .expect("Failed to initialize store");
1913
1914 for i in 0..5 {
1916 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1917 }
1918
1919 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1921 store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1922 store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1923
1924 for i in 10..15 {
1926 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1927 }
1928
1929 store.sync().await.unwrap();
1930 }
1931
1932 {
1934 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
1935
1936 let none_option: Option<BitMap> = None;
1938 bits_map.insert(0, &none_option);
1939
1940 let mut bitmap1 = BitMap::zeroes(5);
1942 bitmap1.set(0, true); bitmap1.set(2, true); let bitmap1_option = Some(bitmap1);
1946 bits_map.insert(1, &bitmap1_option);
1947
1948 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1951 context.with_label("second"),
1952 cfg.clone(),
1953 Some(bits_map),
1954 )
1955 .await
1956 .expect("Failed to initialize store with bits");
1957
1958 for i in 0..5 {
1960 assert!(store.has(i));
1961 assert_eq!(
1962 store.get(i).await.unwrap().unwrap(),
1963 FixedBytes::new([i as u8; 32])
1964 );
1965 }
1966
1967 assert!(store.has(5));
1969 assert!(store.has(7));
1970 assert!(!store.has(6));
1971 assert!(!store.has(8));
1972 assert!(!store.has(9)); for i in 10..15 {
1976 assert!(!store.has(i));
1977 }
1978 }
1979 });
1980 }
1981
1982 #[test_traced]
1983 #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(2)")]
1984 fn test_init_with_bits_corrupted_records() {
1985 let executor = deterministic::Runner::default();
1987 executor.start(|context| async move {
1988 let cfg = Config {
1989 partition: "test-ordinal".into(),
1990 items_per_blob: NZU64!(5),
1991 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1992 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1993 };
1994
1995 {
1997 let mut store =
1998 Ordinal::<_, FixedBytes<32>>::init(context.with_label("first"), cfg.clone())
1999 .await
2000 .expect("Failed to initialize store");
2001
2002 for i in 0..5 {
2004 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
2005 }
2006
2007 store.sync().await.unwrap();
2008 }
2009
2010 {
2012 let (blob, _) = context
2013 .open("test-ordinal", &0u64.to_be_bytes())
2014 .await
2015 .unwrap();
2016 let offset = 2 * 36 + 32; blob.write_at(offset, vec![0xFF]).await.unwrap();
2019 blob.sync().await.unwrap();
2020 }
2021
2022 {
2024 let mut bits_map: BTreeMap<u64, &Option<BitMap>> = BTreeMap::new();
2025
2026 let mut bitmap = BitMap::zeroes(5);
2028 bitmap.set(0, true); bitmap.set(2, true); bitmap.set(4, true); let bitmap_option = Some(bitmap);
2032 bits_map.insert(0, &bitmap_option);
2033
2034 let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
2035 context.with_label("second"),
2036 cfg.clone(),
2037 Some(bits_map),
2038 )
2039 .await
2040 .expect("Failed to initialize store with bits");
2041 }
2042 });
2043 }
2044
2045 #[derive(Debug, PartialEq, Eq)]
2047 pub struct DummyValue {
2048 pub value: u64,
2049 }
2050
2051 impl Write for DummyValue {
2052 fn write(&self, buf: &mut impl BufMut) {
2053 self.value.write(buf);
2054 }
2055 }
2056
2057 impl Read for DummyValue {
2058 type Cfg = ();
2059
2060 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
2061 let value = u64::read(buf)?;
2062 if value == 0 {
2063 return Err(commonware_codec::Error::Invalid(
2064 "DummyValue",
2065 "value must be non-zero",
2066 ));
2067 }
2068 Ok(Self { value })
2069 }
2070 }
2071
2072 impl FixedSize for DummyValue {
2073 const SIZE: usize = u64::SIZE;
2074 }
2075
2076 #[test_traced]
2077 fn test_init_skip_unparseable_record() {
2078 let executor = deterministic::Runner::default();
2080 executor.start(|context| async move {
2081 let cfg = Config {
2082 partition: "test-ordinal".into(),
2083 items_per_blob: NZU64!(1),
2084 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
2085 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
2086 };
2087
2088 {
2090 let mut store =
2091 Ordinal::<_, DummyValue>::init(context.with_label("first"), cfg.clone())
2092 .await
2093 .expect("Failed to initialize store");
2094
2095 store.put(1, DummyValue { value: 1 }).await.unwrap();
2097 store.put(2, DummyValue { value: 0 }).await.unwrap(); store.put(4, DummyValue { value: 4 }).await.unwrap();
2099
2100 store.sync().await.unwrap();
2101 }
2102
2103 {
2105 let store =
2106 Ordinal::<_, DummyValue>::init(context.with_label("second"), cfg.clone())
2107 .await
2108 .expect("Failed to initialize store");
2109
2110 assert!(store.has(1), "Record 1 should be available");
2112 assert_eq!(
2113 store.get(1).await.unwrap().unwrap(),
2114 DummyValue { value: 1 },
2115 "Record 0 should have correct value"
2116 );
2117
2118 assert!(
2120 !store.has(2),
2121 "Record 2 should not be available (unparseable)"
2122 );
2123
2124 assert!(
2126 store.has(4),
2127 "Record 4 should be available - we should not exit early on unparseable record"
2128 );
2129 assert_eq!(
2130 store.get(4).await.unwrap().unwrap(),
2131 DummyValue { value: 4 },
2132 "Record 4 should have correct value"
2133 );
2134 }
2135 });
2136 }
2137}