1mod storage;
97
98use std::num::{NonZeroU64, NonZeroUsize};
99pub use storage::Ordinal;
100use thiserror::Error;
101
102#[derive(Debug, Error)]
104pub enum Error {
105 #[error("runtime error: {0}")]
106 Runtime(#[from] commonware_runtime::Error),
107 #[error("codec error: {0}")]
108 Codec(#[from] commonware_codec::Error),
109 #[error("invalid blob name: {0}")]
110 InvalidBlobName(String),
111 #[error("invalid record: {0}")]
112 InvalidRecord(u64),
113 #[error("missing record at {0}")]
114 MissingRecord(u64),
115}
116
117#[derive(Clone)]
119pub struct Config {
120 pub partition: String,
122
123 pub items_per_blob: NonZeroU64,
125
126 pub write_buffer: NonZeroUsize,
128
129 pub replay_buffer: NonZeroUsize,
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use bytes::{Buf, BufMut};
137 use commonware_codec::{FixedSize, Read, ReadExt, Write};
138 use commonware_macros::test_traced;
139 use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
140 use commonware_utils::{sequence::FixedBytes, BitVec, NZUsize, NZU64};
141 use rand::RngCore;
142 use std::collections::BTreeMap;
143
144 const DEFAULT_ITEMS_PER_BLOB: u64 = 1000;
145 const DEFAULT_WRITE_BUFFER: usize = 4096;
146 const DEFAULT_REPLAY_BUFFER: usize = 1024 * 1024;
147
148 #[test_traced]
149 fn test_put_get() {
150 let executor = deterministic::Runner::default();
152 executor.start(|context| async move {
153 let cfg = Config {
155 partition: "test_ordinal".into(),
156 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
157 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
158 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
159 };
160 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
161 .await
162 .expect("Failed to initialize store");
163
164 let value = FixedBytes::new([42u8; 32]);
165
166 assert!(!store.has(0));
168
169 store
171 .put(0, value.clone())
172 .await
173 .expect("Failed to put data");
174
175 assert!(store.has(0));
177
178 let retrieved = store
180 .get(0)
181 .await
182 .expect("Failed to get data")
183 .expect("Data not found");
184 assert_eq!(retrieved, value);
185
186 store.sync().await.expect("Failed to sync data");
188
189 let buffer = context.encode();
191 assert!(buffer.contains("gets_total 1"), "{}", buffer);
192 assert!(buffer.contains("puts_total 1"), "{}", buffer);
193 assert!(buffer.contains("has_total 2"), "{}", buffer);
194 assert!(buffer.contains("syncs_total 1"), "{}", buffer);
195 assert!(buffer.contains("pruned_total 0"), "{}", buffer);
196
197 let retrieved = store
199 .get(0)
200 .await
201 .expect("Failed to get data")
202 .expect("Data not found");
203 assert_eq!(retrieved, value);
204 });
205 }
206
207 #[test_traced]
208 fn test_multiple_indices() {
209 let executor = deterministic::Runner::default();
211 executor.start(|context| async move {
212 let cfg = Config {
214 partition: "test_ordinal".into(),
215 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
216 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
217 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
218 };
219 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
220 .await
221 .expect("Failed to initialize store");
222
223 let indices = vec![
225 (0u64, FixedBytes::new([0u8; 32])),
226 (5u64, FixedBytes::new([5u8; 32])),
227 (10u64, FixedBytes::new([10u8; 32])),
228 (100u64, FixedBytes::new([100u8; 32])),
229 (1000u64, FixedBytes::new([200u8; 32])), ];
231
232 for (index, value) in &indices {
233 store
234 .put(*index, value.clone())
235 .await
236 .expect("Failed to put data");
237 }
238
239 store.sync().await.expect("Failed to sync");
241
242 for (index, value) in &indices {
244 let retrieved = store
245 .get(*index)
246 .await
247 .expect("Failed to get data")
248 .expect("Data not found");
249 assert_eq!(&retrieved, value);
250 }
251 });
252 }
253
254 #[test_traced]
255 fn test_sparse_indices() {
256 let executor = deterministic::Runner::default();
258 executor.start(|context| async move {
259 let cfg = Config {
261 partition: "test_ordinal".into(),
262 items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
264 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
265 };
266 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
267 .await
268 .expect("Failed to initialize store");
269
270 let indices = vec![
272 (0u64, FixedBytes::new([0u8; 32])),
273 (99u64, FixedBytes::new([99u8; 32])), (100u64, FixedBytes::new([100u8; 32])), (500u64, FixedBytes::new([200u8; 32])), ];
277
278 for (index, value) in &indices {
279 store
280 .put(*index, value.clone())
281 .await
282 .expect("Failed to put data");
283 }
284
285 assert!(!store.has(1));
287 assert!(!store.has(50));
288 assert!(!store.has(101));
289 assert!(!store.has(499));
290
291 store.sync().await.expect("Failed to sync");
293
294 for (index, value) in &indices {
295 let retrieved = store
296 .get(*index)
297 .await
298 .expect("Failed to get data")
299 .expect("Data not found");
300 assert_eq!(&retrieved, value);
301 }
302 });
303 }
304
305 #[test_traced]
306 fn test_next_gap() {
307 let executor = deterministic::Runner::default();
309 executor.start(|context| async move {
310 let cfg = Config {
312 partition: "test_ordinal".into(),
313 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
314 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
315 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
316 };
317 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
318 .await
319 .expect("Failed to initialize store");
320
321 store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
323 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
324 store.put(11, FixedBytes::new([11u8; 32])).await.unwrap();
325 store.put(14, FixedBytes::new([14u8; 32])).await.unwrap();
326
327 let (current_end, start_next) = store.next_gap(0);
329 assert!(current_end.is_none());
330 assert_eq!(start_next, Some(1));
331
332 let (current_end, start_next) = store.next_gap(1);
333 assert_eq!(current_end, Some(1));
334 assert_eq!(start_next, Some(10));
335
336 let (current_end, start_next) = store.next_gap(10);
337 assert_eq!(current_end, Some(11));
338 assert_eq!(start_next, Some(14));
339
340 let (current_end, start_next) = store.next_gap(11);
341 assert_eq!(current_end, Some(11));
342 assert_eq!(start_next, Some(14));
343
344 let (current_end, start_next) = store.next_gap(12);
345 assert!(current_end.is_none());
346 assert_eq!(start_next, Some(14));
347
348 let (current_end, start_next) = store.next_gap(14);
349 assert_eq!(current_end, Some(14));
350 assert!(start_next.is_none());
351 });
352 }
353
354 #[test_traced]
355 fn test_missing_items() {
356 let executor = deterministic::Runner::default();
358 executor.start(|context| async move {
359 let cfg = Config {
361 partition: "test_ordinal".into(),
362 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
363 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
364 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
365 };
366 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
367 .await
368 .expect("Failed to initialize store");
369
370 assert_eq!(store.missing_items(0, 5), Vec::<u64>::new());
372 assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
373
374 store.put(1, FixedBytes::new([1u8; 32])).await.unwrap();
376 store.put(2, FixedBytes::new([2u8; 32])).await.unwrap();
377 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
378 store.put(6, FixedBytes::new([6u8; 32])).await.unwrap();
379 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
380
381 assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
383 assert_eq!(store.missing_items(0, 6), vec![0, 3, 4, 7, 8, 9]);
384 assert_eq!(store.missing_items(0, 7), vec![0, 3, 4, 7, 8, 9]);
385
386 assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
388 assert_eq!(store.missing_items(4, 2), vec![4, 7]);
389
390 assert_eq!(store.missing_items(1, 3), vec![3, 4, 7]);
392 assert_eq!(store.missing_items(2, 4), vec![3, 4, 7, 8]);
393 assert_eq!(store.missing_items(5, 2), vec![7, 8]);
394
395 assert_eq!(store.missing_items(11, 5), Vec::<u64>::new());
397 assert_eq!(store.missing_items(100, 10), Vec::<u64>::new());
398
399 store.put(1000, FixedBytes::new([100u8; 32])).await.unwrap();
401
402 let items = store.missing_items(11, 10);
404 assert_eq!(items, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20]);
405
406 let items = store.missing_items(990, 15);
408 assert_eq!(
409 items,
410 vec![990, 991, 992, 993, 994, 995, 996, 997, 998, 999]
411 );
412
413 store.sync().await.unwrap();
415 assert_eq!(store.missing_items(0, 5), vec![0, 3, 4, 7, 8]);
416 assert_eq!(store.missing_items(3, 3), vec![3, 4, 7]);
417
418 store.put(9999, FixedBytes::new([99u8; 32])).await.unwrap();
420 store
421 .put(10001, FixedBytes::new([101u8; 32]))
422 .await
423 .unwrap();
424
425 let items = store.missing_items(9998, 5);
427 assert_eq!(items, vec![9998, 10000]);
428
429 store.close().await.expect("Failed to close store");
430 });
431 }
432
433 #[test_traced]
434 fn test_restart() {
435 let executor = deterministic::Runner::default();
437 executor.start(|context| async move {
438 let cfg = Config {
439 partition: "test_ordinal".into(),
440 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
441 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
442 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
443 };
444
445 {
447 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
448 .await
449 .expect("Failed to initialize store");
450
451 let values = vec![
452 (0u64, FixedBytes::new([0u8; 32])),
453 (100u64, FixedBytes::new([100u8; 32])),
454 (1000u64, FixedBytes::new([200u8; 32])),
455 ];
456
457 for (index, value) in &values {
458 store
459 .put(*index, value.clone())
460 .await
461 .expect("Failed to put data");
462 }
463
464 store.close().await.expect("Failed to close store");
465 }
466
467 {
469 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
470 .await
471 .expect("Failed to initialize store");
472
473 let values = vec![
474 (0u64, FixedBytes::new([0u8; 32])),
475 (100u64, FixedBytes::new([100u8; 32])),
476 (1000u64, FixedBytes::new([200u8; 32])),
477 ];
478
479 for (index, value) in &values {
480 let retrieved = store
481 .get(*index)
482 .await
483 .expect("Failed to get data")
484 .expect("Data not found");
485 assert_eq!(&retrieved, value);
486 }
487
488 let (current_end, start_next) = store.next_gap(0);
490 assert_eq!(current_end, Some(0));
491 assert_eq!(start_next, Some(100));
492 }
493 });
494 }
495
496 #[test_traced]
497 fn test_invalid_record() {
498 let executor = deterministic::Runner::default();
500 executor.start(|context| async move {
501 let cfg = Config {
502 partition: "test_ordinal".into(),
503 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
504 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
505 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
506 };
507
508 {
510 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
511 .await
512 .expect("Failed to initialize store");
513
514 store
515 .put(0, FixedBytes::new([42u8; 32]))
516 .await
517 .expect("Failed to put data");
518 store.close().await.expect("Failed to close store");
519 }
520
521 {
523 let (blob, _) = context
524 .open("test_ordinal", &0u64.to_be_bytes())
525 .await
526 .unwrap();
527 blob.write_at(vec![0xFF], 32).await.unwrap();
529 blob.sync().await.unwrap();
530 }
531
532 {
534 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
535 .await
536 .expect("Failed to initialize store");
537
538 let result = store.get(0).await.unwrap();
540 assert!(result.is_none());
541
542 assert!(!store.has(0));
544 }
545 });
546 }
547
548 #[test_traced]
549 fn test_get_nonexistent() {
550 let executor = deterministic::Runner::default();
552 executor.start(|context| async move {
553 let cfg = Config {
555 partition: "test_ordinal".into(),
556 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
557 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
558 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
559 };
560 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
561 .await
562 .expect("Failed to initialize store");
563
564 let retrieved = store.get(999).await.expect("Failed to get data");
566 assert!(retrieved.is_none());
567
568 assert!(!store.has(999));
570 });
571 }
572
573 #[test_traced]
574 fn test_destroy() {
575 let executor = deterministic::Runner::default();
577 executor.start(|context| async move {
578 let cfg = Config {
579 partition: "test_ordinal".into(),
580 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
581 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
582 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
583 };
584
585 {
587 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
588 .await
589 .expect("Failed to initialize store");
590
591 store
592 .put(0, FixedBytes::new([0u8; 32]))
593 .await
594 .expect("Failed to put data");
595 store
596 .put(1000, FixedBytes::new([100u8; 32]))
597 .await
598 .expect("Failed to put data");
599
600 store.destroy().await.expect("Failed to destroy store");
602 }
603
604 {
606 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
607 .await
608 .expect("Failed to initialize store");
609
610 assert!(store.get(0).await.unwrap().is_none());
612 assert!(store.get(1000).await.unwrap().is_none());
613 assert!(!store.has(0));
614 assert!(!store.has(1000));
615 }
616 });
617 }
618
619 #[test_traced]
620 fn test_partial_record_write() {
621 let executor = deterministic::Runner::default();
623 executor.start(|context| async move {
624 let cfg = Config {
625 partition: "test_ordinal".into(),
626 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
627 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
628 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
629 };
630
631 {
633 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
634 .await
635 .expect("Failed to initialize store");
636
637 store
638 .put(0, FixedBytes::new([42u8; 32]))
639 .await
640 .expect("Failed to put data");
641 store
642 .put(1, FixedBytes::new([43u8; 32]))
643 .await
644 .expect("Failed to put data");
645 store.close().await.expect("Failed to close store");
646 }
647
648 {
650 let (blob, _) = context
651 .open("test_ordinal", &0u64.to_be_bytes())
652 .await
653 .unwrap();
654 blob.write_at(vec![0xFF; 32], 36).await.unwrap();
656 blob.sync().await.unwrap();
657 }
658
659 {
661 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
662 .await
663 .expect("Failed to initialize store");
664
665 assert_eq!(
667 store.get(0).await.unwrap().unwrap(),
668 FixedBytes::new([42u8; 32])
669 );
670
671 assert!(!store.has(1));
673 assert!(store.get(1).await.unwrap().is_none());
674
675 let mut store_mut = store;
677 store_mut.put(1, FixedBytes::new([44u8; 32])).await.unwrap();
678 assert_eq!(
679 store_mut.get(1).await.unwrap().unwrap(),
680 FixedBytes::new([44u8; 32])
681 );
682 }
683 });
684 }
685
686 #[test_traced]
687 fn test_corrupted_value() {
688 let executor = deterministic::Runner::default();
690 executor.start(|context| async move {
691 let cfg = Config {
692 partition: "test_ordinal".into(),
693 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
694 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
695 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
696 };
697
698 {
700 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
701 .await
702 .expect("Failed to initialize store");
703
704 store
705 .put(0, FixedBytes::new([42u8; 32]))
706 .await
707 .expect("Failed to put data");
708 store
709 .put(1, FixedBytes::new([43u8; 32]))
710 .await
711 .expect("Failed to put data");
712 store.close().await.expect("Failed to close store");
713 }
714
715 {
717 let (blob, _) = context
718 .open("test_ordinal", &0u64.to_be_bytes())
719 .await
720 .unwrap();
721 blob.write_at(vec![0xFF, 0xFF, 0xFF, 0xFF], 10)
723 .await
724 .unwrap();
725 blob.sync().await.unwrap();
726 }
727
728 {
730 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
731 .await
732 .expect("Failed to initialize store");
733
734 assert!(!store.has(0));
736
737 assert!(store.has(1));
739 assert_eq!(
740 store.get(1).await.unwrap().unwrap(),
741 FixedBytes::new([43u8; 32])
742 );
743 }
744 });
745 }
746
747 #[test_traced]
748 fn test_crc_corruptions() {
749 let executor = deterministic::Runner::default();
751 executor.start(|context| async move {
752 let cfg = Config {
753 partition: "test_ordinal".into(),
754 items_per_blob: NZU64!(10), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
756 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
757 };
758
759 {
761 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
762 .await
763 .expect("Failed to initialize store");
764
765 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
767 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
768 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
769 store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
770 store.close().await.expect("Failed to close store");
771 }
772
773 {
775 let (blob, _) = context
777 .open("test_ordinal", &0u64.to_be_bytes())
778 .await
779 .unwrap();
780 blob.write_at(vec![0xFF], 32).await.unwrap(); blob.sync().await.unwrap();
782
783 let (blob, _) = context
785 .open("test_ordinal", &1u64.to_be_bytes())
786 .await
787 .unwrap();
788 blob.write_at(vec![0xFF; 4], 5).await.unwrap(); blob.sync().await.unwrap();
790 }
791
792 {
794 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
795 .await
796 .expect("Failed to initialize store");
797
798 assert!(!store.has(0)); assert!(!store.has(10)); assert!(store.has(5));
804 assert!(store.has(15));
805 assert_eq!(
806 store.get(5).await.unwrap().unwrap(),
807 FixedBytes::new([5u8; 32])
808 );
809 assert_eq!(
810 store.get(15).await.unwrap().unwrap(),
811 FixedBytes::new([15u8; 32])
812 );
813 }
814 });
815 }
816
817 #[test_traced]
818 fn test_extra_bytes_in_blob() {
819 let executor = deterministic::Runner::default();
821 executor.start(|context| async move {
822 let cfg = Config {
823 partition: "test_ordinal".into(),
824 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
825 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
826 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
827 };
828
829 {
831 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
832 .await
833 .expect("Failed to initialize store");
834
835 store
836 .put(0, FixedBytes::new([42u8; 32]))
837 .await
838 .expect("Failed to put data");
839 store
840 .put(1, FixedBytes::new([43u8; 32]))
841 .await
842 .expect("Failed to put data");
843 store.close().await.expect("Failed to close store");
844 }
845
846 {
848 let (blob, size) = context
849 .open("test_ordinal", &0u64.to_be_bytes())
850 .await
851 .unwrap();
852 let mut garbage = vec![0xFF; 32]; let invalid_crc = 0xDEADBEEFu32;
856 garbage.extend_from_slice(&invalid_crc.to_be_bytes());
857 assert_eq!(garbage.len(), 36); blob.write_at(garbage, size).await.unwrap();
859 blob.sync().await.unwrap();
860 }
861
862 {
864 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
865 .await
866 .expect("Failed to initialize store");
867
868 assert!(store.has(0));
870 assert!(store.has(1));
871 assert_eq!(
872 store.get(0).await.unwrap().unwrap(),
873 FixedBytes::new([42u8; 32])
874 );
875 assert_eq!(
876 store.get(1).await.unwrap().unwrap(),
877 FixedBytes::new([43u8; 32])
878 );
879
880 let mut store_mut = store;
882 store_mut.put(2, FixedBytes::new([44u8; 32])).await.unwrap();
883 assert_eq!(
884 store_mut.get(2).await.unwrap().unwrap(),
885 FixedBytes::new([44u8; 32])
886 );
887 }
888 });
889 }
890
891 #[test_traced]
892 fn test_zero_filled_records() {
893 let executor = deterministic::Runner::default();
895 executor.start(|context| async move {
896 let cfg = Config {
897 partition: "test_ordinal".into(),
898 items_per_blob: NZU64!(DEFAULT_ITEMS_PER_BLOB),
899 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
900 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
901 };
902
903 {
905 let (blob, _) = context
906 .open("test_ordinal", &0u64.to_be_bytes())
907 .await
908 .unwrap();
909
910 let zeros = vec![0u8; 36 * 5]; blob.write_at(zeros, 0).await.unwrap();
913
914 let mut valid_record = vec![44u8; 32];
916 let crc = crc32fast::hash(&valid_record);
917 valid_record.extend_from_slice(&crc.to_be_bytes());
918 blob.write_at(valid_record, 36 * 5).await.unwrap();
919
920 blob.sync().await.unwrap();
921 }
922
923 {
925 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
926 .await
927 .expect("Failed to initialize store");
928
929 for i in 0..5 {
931 assert!(!store.has(i));
932 }
933
934 assert!(store.has(5));
936 assert_eq!(
937 store.get(5).await.unwrap().unwrap(),
938 FixedBytes::new([44u8; 32])
939 );
940 }
941 });
942 }
943
944 fn test_operations_and_restart(num_values: usize) -> String {
945 let executor = deterministic::Runner::default();
947 executor.start(|mut context| async move {
948 let cfg = Config {
949 partition: "test_ordinal".into(),
950 items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
952 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
953 };
954
955 let mut store = Ordinal::<_, FixedBytes<128>>::init(context.clone(), cfg.clone())
957 .await
958 .expect("Failed to initialize store");
959
960 let mut values = Vec::new();
962 let mut rng_index = 0u64;
963
964 for _ in 0..num_values {
965 let mut index_bytes = [0u8; 8];
967 context.fill_bytes(&mut index_bytes);
968 let index_offset = u64::from_be_bytes(index_bytes) % 1000;
969 let index = rng_index + index_offset;
970 rng_index = index + 1;
971
972 let mut value = [0u8; 128];
974 context.fill_bytes(&mut value);
975 let value = FixedBytes::<128>::new(value);
976
977 store
978 .put(index, value.clone())
979 .await
980 .expect("Failed to put data");
981 values.push((index, value));
982 }
983
984 store.sync().await.expect("Failed to sync");
986
987 for (index, value) in &values {
989 let retrieved = store
990 .get(*index)
991 .await
992 .expect("Failed to get data")
993 .expect("Data not found");
994 assert_eq!(&retrieved, value);
995 }
996
997 for i in 0..10 {
999 let _ = store.next_gap(i * 100);
1000 }
1001
1002 store.close().await.expect("Failed to close store");
1004
1005 let mut store = Ordinal::<_, FixedBytes<128>>::init(context.clone(), cfg)
1007 .await
1008 .expect("Failed to initialize store");
1009
1010 for (index, value) in &values {
1012 let retrieved = store
1013 .get(*index)
1014 .await
1015 .expect("Failed to get data")
1016 .expect("Data not found");
1017 assert_eq!(&retrieved, value);
1018 }
1019
1020 for _ in 0..10 {
1022 let mut index_bytes = [0u8; 8];
1023 context.fill_bytes(&mut index_bytes);
1024 let index = u64::from_be_bytes(index_bytes) % 10000;
1025
1026 let mut value = [0u8; 128];
1027 context.fill_bytes(&mut value);
1028 let value = FixedBytes::<128>::new(value);
1029
1030 store.put(index, value).await.expect("Failed to put data");
1031 }
1032
1033 store.sync().await.expect("Failed to sync");
1035
1036 context.auditor().state()
1038 })
1039 }
1040
1041 #[test_traced]
1042 #[ignore]
1043 fn test_determinism() {
1044 let state1 = test_operations_and_restart(100);
1045 let state2 = test_operations_and_restart(100);
1046 assert_eq!(state1, state2);
1047 }
1048
1049 #[test_traced]
1050 fn test_prune_basic() {
1051 let executor = deterministic::Runner::default();
1053 executor.start(|context| async move {
1054 let cfg = Config {
1055 partition: "test_ordinal".into(),
1056 items_per_blob: NZU64!(100), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1058 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1059 };
1060
1061 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1062 .await
1063 .expect("Failed to initialize store");
1064
1065 let values = vec![
1067 (0u64, FixedBytes::new([0u8; 32])), (50u64, FixedBytes::new([50u8; 32])), (100u64, FixedBytes::new([100u8; 32])), (150u64, FixedBytes::new([150u8; 32])), (200u64, FixedBytes::new([200u8; 32])), (300u64, FixedBytes::new([44u8; 32])), ];
1074
1075 for (index, value) in &values {
1076 store
1077 .put(*index, value.clone())
1078 .await
1079 .expect("Failed to put data");
1080 }
1081 store.sync().await.unwrap();
1082
1083 for (index, value) in &values {
1085 assert_eq!(store.get(*index).await.unwrap().unwrap(), *value);
1086 }
1087
1088 store.prune(150).await.unwrap();
1090 let buffer = context.encode();
1091 assert!(buffer.contains("pruned_total 1"));
1092
1093 assert!(!store.has(0));
1095 assert!(!store.has(50));
1096 assert!(store.get(0).await.unwrap().is_none());
1097 assert!(store.get(50).await.unwrap().is_none());
1098
1099 assert!(store.has(100));
1101 assert!(store.has(150));
1102 assert!(store.has(200));
1103 assert!(store.has(300));
1104 assert_eq!(store.get(100).await.unwrap().unwrap(), values[2].1);
1105 assert_eq!(store.get(150).await.unwrap().unwrap(), values[3].1);
1106 assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1107 assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1108
1109 store.prune(250).await.unwrap();
1111 let buffer = context.encode();
1112 assert!(buffer.contains("pruned_total 2"));
1113
1114 assert!(!store.has(100));
1116 assert!(!store.has(150));
1117 assert!(store.get(100).await.unwrap().is_none());
1118 assert!(store.get(150).await.unwrap().is_none());
1119
1120 assert!(store.has(200));
1122 assert!(store.has(300));
1123 assert_eq!(store.get(200).await.unwrap().unwrap(), values[4].1);
1124 assert_eq!(store.get(300).await.unwrap().unwrap(), values[5].1);
1125 });
1126 }
1127
1128 #[test_traced]
1129 fn test_prune_with_gaps() {
1130 let executor = deterministic::Runner::default();
1132 executor.start(|context| async move {
1133 let cfg = Config {
1134 partition: "test_ordinal".into(),
1135 items_per_blob: NZU64!(100),
1136 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1137 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1138 };
1139
1140 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1141 .await
1142 .expect("Failed to initialize store");
1143
1144 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1146 store.put(105, FixedBytes::new([105u8; 32])).await.unwrap();
1147 store.put(305, FixedBytes::new([49u8; 32])).await.unwrap();
1148 store.sync().await.unwrap();
1149
1150 let (current_end, next_start) = store.next_gap(0);
1152 assert!(current_end.is_none());
1153 assert_eq!(next_start, Some(5));
1154
1155 let (current_end, next_start) = store.next_gap(5);
1156 assert_eq!(current_end, Some(5));
1157 assert_eq!(next_start, Some(105));
1158
1159 store.prune(150).await.unwrap();
1161
1162 assert!(!store.has(5));
1164 assert!(store.get(5).await.unwrap().is_none());
1165
1166 assert!(store.has(105));
1168 assert!(store.has(305));
1169
1170 let (current_end, next_start) = store.next_gap(0);
1171 assert!(current_end.is_none());
1172 assert_eq!(next_start, Some(105));
1173
1174 let (current_end, next_start) = store.next_gap(105);
1175 assert_eq!(current_end, Some(105));
1176 assert_eq!(next_start, Some(305));
1177 });
1178 }
1179
1180 #[test_traced]
1181 fn test_prune_no_op() {
1182 let executor = deterministic::Runner::default();
1184 executor.start(|context| async move {
1185 let cfg = Config {
1186 partition: "test_ordinal".into(),
1187 items_per_blob: NZU64!(100),
1188 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1189 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1190 };
1191
1192 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1193 .await
1194 .expect("Failed to initialize store");
1195
1196 store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1198 store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1199 store.sync().await.unwrap();
1200
1201 store.prune(50).await.unwrap();
1203
1204 assert!(store.has(100));
1206 assert!(store.has(200));
1207 let buffer = context.encode();
1208 assert!(buffer.contains("pruned_total 0"));
1209
1210 store.prune(100).await.unwrap();
1212
1213 assert!(store.has(100));
1215 assert!(store.has(200));
1216 let buffer = context.encode();
1217 assert!(buffer.contains("pruned_total 0"));
1218 });
1219 }
1220
1221 #[test_traced]
1222 fn test_prune_empty_store() {
1223 let executor = deterministic::Runner::default();
1225 executor.start(|context| async move {
1226 let cfg = Config {
1227 partition: "test_ordinal".into(),
1228 items_per_blob: NZU64!(100),
1229 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1230 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1231 };
1232
1233 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1234 .await
1235 .expect("Failed to initialize store");
1236
1237 store.prune(1000).await.unwrap();
1239
1240 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1242 assert!(store.has(0));
1243 });
1244 }
1245
1246 #[test_traced]
1247 fn test_prune_after_restart() {
1248 let executor = deterministic::Runner::default();
1250 executor.start(|context| async move {
1251 let cfg = Config {
1252 partition: "test_ordinal".into(),
1253 items_per_blob: NZU64!(100),
1254 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1255 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1256 };
1257
1258 {
1260 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1261 .await
1262 .expect("Failed to initialize store");
1263
1264 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1265 store.put(100, FixedBytes::new([100u8; 32])).await.unwrap();
1266 store.put(200, FixedBytes::new([200u8; 32])).await.unwrap();
1267 store.close().await.unwrap();
1268 }
1269
1270 {
1272 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1273 .await
1274 .expect("Failed to initialize store");
1275
1276 assert!(store.has(0));
1278 assert!(store.has(100));
1279 assert!(store.has(200));
1280
1281 store.prune(150).await.unwrap();
1283
1284 assert!(!store.has(0));
1286 assert!(store.has(100));
1287 assert!(store.has(200));
1288
1289 store.close().await.unwrap();
1290 }
1291
1292 {
1294 let store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1295 .await
1296 .expect("Failed to initialize store");
1297
1298 assert!(!store.has(0));
1299 assert!(store.has(100));
1300 assert!(store.has(200));
1301
1302 let (current_end, next_start) = store.next_gap(0);
1304 assert!(current_end.is_none());
1305 assert_eq!(next_start, Some(100));
1306 }
1307 });
1308 }
1309
1310 #[test_traced]
1311 fn test_prune_multiple_operations() {
1312 let executor = deterministic::Runner::default();
1314 executor.start(|context| async move {
1315 let cfg = Config {
1316 partition: "test_ordinal".into(),
1317 items_per_blob: NZU64!(50), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1319 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1320 };
1321
1322 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1323 .await
1324 .expect("Failed to initialize store");
1325
1326 let mut values = Vec::new();
1328 for i in 0..10 {
1329 let index = i * 50 + 25; let value = FixedBytes::new([i as u8; 32]);
1331 store.put(index, value.clone()).await.unwrap();
1332 values.push((index, value));
1333 }
1334 store.sync().await.unwrap();
1335
1336 for i in 1..5 {
1338 let prune_index = i * 50 + 10;
1339 store.prune(prune_index).await.unwrap();
1340
1341 for (index, _) in &values {
1343 if *index < prune_index {
1344 assert!(!store.has(*index), "Index {index} should be pruned");
1345 } else {
1346 assert!(store.has(*index), "Index {index} should not be pruned");
1347 }
1348 }
1349 }
1350
1351 let buffer = context.encode();
1353 assert!(buffer.contains("pruned_total 4"));
1354
1355 for i in 4..10 {
1357 let index = i * 50 + 25;
1358 assert!(store.has(index));
1359 assert_eq!(
1360 store.get(index).await.unwrap().unwrap(),
1361 values[i as usize].1
1362 );
1363 }
1364 });
1365 }
1366
1367 #[test_traced]
1368 fn test_prune_blob_boundaries() {
1369 let executor = deterministic::Runner::default();
1371 executor.start(|context| async move {
1372 let cfg = Config {
1373 partition: "test_ordinal".into(),
1374 items_per_blob: NZU64!(100),
1375 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1376 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1377 };
1378
1379 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1380 .await
1381 .expect("Failed to initialize store");
1382
1383 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); store.put(99, FixedBytes::new([99u8; 32])).await.unwrap(); store.put(100, FixedBytes::new([100u8; 32])).await.unwrap(); store.put(199, FixedBytes::new([199u8; 32])).await.unwrap(); store.put(200, FixedBytes::new([200u8; 32])).await.unwrap(); store.sync().await.unwrap();
1390
1391 store.prune(100).await.unwrap();
1395 assert!(!store.has(0));
1396 assert!(!store.has(99));
1397 assert!(store.has(100));
1398 assert!(store.has(199));
1399 assert!(store.has(200));
1400
1401 store.prune(199).await.unwrap();
1403 assert!(store.has(100));
1404 assert!(store.has(199));
1405 assert!(store.has(200));
1406
1407 store.prune(200).await.unwrap();
1409 assert!(!store.has(100));
1410 assert!(!store.has(199));
1411 assert!(store.has(200));
1412
1413 let buffer = context.encode();
1414 assert!(buffer.contains("pruned_total 2"));
1415 });
1416 }
1417
1418 #[test_traced]
1419 fn test_prune_non_contiguous_sections() {
1420 let executor = deterministic::Runner::default();
1422 executor.start(|context| async move {
1423 let cfg = Config {
1424 partition: "test_ordinal".into(),
1425 items_per_blob: NZU64!(100),
1426 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1427 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1428 };
1429
1430 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1431 .await
1432 .expect("Failed to initialize store");
1433
1434 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap(); store.put(250, FixedBytes::new([50u8; 32])).await.unwrap(); store.put(500, FixedBytes::new([44u8; 32])).await.unwrap(); store.put(750, FixedBytes::new([45u8; 32])).await.unwrap(); store.sync().await.unwrap();
1440
1441 assert!(store.has(0));
1443 assert!(store.has(250));
1444 assert!(store.has(500));
1445 assert!(store.has(750));
1446
1447 store.prune(300).await.unwrap();
1449
1450 assert!(!store.has(0)); assert!(!store.has(250)); assert!(store.has(500)); assert!(store.has(750)); let buffer = context.encode();
1457 assert!(buffer.contains("pruned_total 2"));
1458
1459 store.prune(600).await.unwrap();
1461
1462 assert!(!store.has(500)); assert!(store.has(750)); let buffer = context.encode();
1467 assert!(buffer.contains("pruned_total 3"));
1468
1469 store.prune(1000).await.unwrap();
1471
1472 assert!(!store.has(750)); let buffer = context.encode();
1476 assert!(buffer.contains("pruned_total 4"));
1477 });
1478 }
1479
1480 #[test_traced]
1481 fn test_prune_removes_correct_pending() {
1482 let executor = deterministic::Runner::default();
1484 executor.start(|context| async move {
1485 let cfg = Config {
1486 partition: "test_ordinal".into(),
1487 items_per_blob: NZU64!(100),
1488 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1489 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1490 };
1491 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1492 .await
1493 .expect("Failed to initialize store");
1494
1495 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1497 store.sync().await.unwrap();
1498
1499 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap(); store.put(110, FixedBytes::new([110u8; 32])).await.unwrap(); assert!(store.has(5));
1505 assert!(store.has(10));
1506 assert!(store.has(110));
1507
1508 store.prune(150).await.unwrap();
1510
1511 assert!(!store.has(5));
1513 assert!(!store.has(10));
1514
1515 assert!(store.has(110));
1517 assert_eq!(
1518 store.get(110).await.unwrap().unwrap(),
1519 FixedBytes::new([110u8; 32])
1520 );
1521
1522 store.sync().await.unwrap();
1524 assert!(store.has(110));
1525 assert_eq!(
1526 store.get(110).await.unwrap().unwrap(),
1527 FixedBytes::new([110u8; 32])
1528 );
1529 });
1530 }
1531
1532 #[test_traced]
1533 fn test_init_with_bits_none() {
1534 let executor = deterministic::Runner::default();
1536 executor.start(|context| async move {
1537 let cfg = Config {
1538 partition: "test_ordinal".into(),
1539 items_per_blob: NZU64!(10), write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1541 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1542 };
1543
1544 {
1546 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1547 .await
1548 .expect("Failed to initialize store");
1549
1550 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1552 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1553 store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1554
1555 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1557 store.put(15, FixedBytes::new([15u8; 32])).await.unwrap();
1558
1559 store.put(25, FixedBytes::new([25u8; 32])).await.unwrap();
1561
1562 store.close().await.unwrap();
1563 }
1564
1565 {
1567 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1568 context.clone(),
1569 cfg.clone(),
1570 None,
1571 )
1572 .await
1573 .expect("Failed to initialize store with bits");
1574
1575 assert!(store.has(0));
1577 assert!(store.has(5));
1578 assert!(store.has(9));
1579 assert!(store.has(10));
1580 assert!(store.has(15));
1581 assert!(store.has(25));
1582
1583 assert!(!store.has(1));
1585 assert!(!store.has(11));
1586 assert!(!store.has(20));
1587
1588 assert_eq!(
1590 store.get(0).await.unwrap().unwrap(),
1591 FixedBytes::new([0u8; 32])
1592 );
1593 assert_eq!(
1594 store.get(15).await.unwrap().unwrap(),
1595 FixedBytes::new([15u8; 32])
1596 );
1597 }
1598 });
1599 }
1600
1601 #[test_traced]
1602 fn test_init_with_bits_empty_hashmap() {
1603 let executor = deterministic::Runner::default();
1605 executor.start(|context| async move {
1606 let cfg = Config {
1607 partition: "test_ordinal".into(),
1608 items_per_blob: NZU64!(10),
1609 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1610 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1611 };
1612
1613 {
1615 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1616 .await
1617 .expect("Failed to initialize store");
1618
1619 store.put(0, FixedBytes::new([0u8; 32])).await.unwrap();
1620 store.put(10, FixedBytes::new([10u8; 32])).await.unwrap();
1621 store.put(20, FixedBytes::new([20u8; 32])).await.unwrap();
1622
1623 store.close().await.unwrap();
1624 }
1625
1626 {
1628 let bits: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1629 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1630 context.clone(),
1631 cfg.clone(),
1632 Some(bits),
1633 )
1634 .await
1635 .expect("Failed to initialize store with bits");
1636
1637 assert!(!store.has(0));
1639 assert!(!store.has(10));
1640 assert!(!store.has(20));
1641 }
1642 });
1643 }
1644
1645 #[test_traced]
1646 fn test_init_with_bits_selective_sections() {
1647 let executor = deterministic::Runner::default();
1649 executor.start(|context| async move {
1650 let cfg = Config {
1651 partition: "test_ordinal".into(),
1652 items_per_blob: NZU64!(10),
1653 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1654 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1655 };
1656
1657 {
1659 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1660 .await
1661 .expect("Failed to initialize store");
1662
1663 for i in 0..10 {
1665 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1666 }
1667
1668 for i in 10..20 {
1670 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1671 }
1672
1673 for i in 20..30 {
1675 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1676 }
1677
1678 store.close().await.unwrap();
1679 }
1680
1681 {
1683 let mut bits_map: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1684
1685 let mut bitvec = BitVec::zeroes(10);
1687 bitvec.set(2); bitvec.set(5); bitvec.set(8); let bitvec_option = Some(bitvec);
1691
1692 bits_map.insert(1, &bitvec_option);
1693
1694 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1695 context.clone(),
1696 cfg.clone(),
1697 Some(bits_map),
1698 )
1699 .await
1700 .expect("Failed to initialize store with bits");
1701
1702 assert!(store.has(12));
1704 assert!(store.has(15));
1705 assert!(store.has(18));
1706
1707 assert!(!store.has(10));
1709 assert!(!store.has(11));
1710 assert!(!store.has(13));
1711 assert!(!store.has(14));
1712 assert!(!store.has(16));
1713 assert!(!store.has(17));
1714 assert!(!store.has(19));
1715
1716 for i in 0..10 {
1718 assert!(!store.has(i));
1719 }
1720 for i in 20..30 {
1721 assert!(!store.has(i));
1722 }
1723
1724 assert_eq!(
1726 store.get(12).await.unwrap().unwrap(),
1727 FixedBytes::new([12u8; 32])
1728 );
1729 assert_eq!(
1730 store.get(15).await.unwrap().unwrap(),
1731 FixedBytes::new([15u8; 32])
1732 );
1733 assert_eq!(
1734 store.get(18).await.unwrap().unwrap(),
1735 FixedBytes::new([18u8; 32])
1736 );
1737 }
1738 });
1739 }
1740
1741 #[test_traced]
1742 fn test_init_with_bits_none_option_all_records_exist() {
1743 let executor = deterministic::Runner::default();
1745 executor.start(|context| async move {
1746 let cfg = Config {
1747 partition: "test_ordinal".into(),
1748 items_per_blob: NZU64!(5),
1749 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1750 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1751 };
1752
1753 {
1755 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1756 .await
1757 .expect("Failed to initialize store");
1758
1759 for i in 5..10 {
1761 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1762 }
1763
1764 store.close().await.unwrap();
1765 }
1766
1767 {
1769 let mut bits_map: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1770 let none_option: Option<BitVec> = None;
1771 bits_map.insert(1, &none_option);
1772
1773 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1774 context.clone(),
1775 cfg.clone(),
1776 Some(bits_map),
1777 )
1778 .await
1779 .expect("Failed to initialize store with bits");
1780
1781 for i in 5..10 {
1783 assert!(store.has(i));
1784 assert_eq!(
1785 store.get(i).await.unwrap().unwrap(),
1786 FixedBytes::new([i as u8; 32])
1787 );
1788 }
1789 }
1790 });
1791 }
1792
1793 #[test_traced]
1794 #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(6)")]
1795 fn test_init_with_bits_none_option_missing_record_panics() {
1796 let executor = deterministic::Runner::default();
1798 executor.start(|context| async move {
1799 let cfg = Config {
1800 partition: "test_ordinal".into(),
1801 items_per_blob: NZU64!(5),
1802 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1803 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1804 };
1805
1806 {
1808 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1809 .await
1810 .expect("Failed to initialize store");
1811
1812 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1814 store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1816 store.put(8, FixedBytes::new([8u8; 32])).await.unwrap();
1817 store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1818
1819 store.close().await.unwrap();
1820 }
1821
1822 {
1825 let mut bits_map: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1826 let none_option: Option<BitVec> = None;
1827 bits_map.insert(1, &none_option);
1828
1829 let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1830 context.clone(),
1831 cfg.clone(),
1832 Some(bits_map),
1833 )
1834 .await
1835 .expect("Failed to initialize store with bits");
1836 }
1837 });
1838 }
1839
1840 #[test_traced]
1841 fn test_init_with_bits_mixed_sections() {
1842 let executor = deterministic::Runner::default();
1844 executor.start(|context| async move {
1845 let cfg = Config {
1846 partition: "test_ordinal".into(),
1847 items_per_blob: NZU64!(5),
1848 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1849 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1850 };
1851
1852 {
1854 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1855 .await
1856 .expect("Failed to initialize store");
1857
1858 for i in 0..5 {
1860 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1861 }
1862
1863 store.put(5, FixedBytes::new([5u8; 32])).await.unwrap();
1865 store.put(7, FixedBytes::new([7u8; 32])).await.unwrap();
1866 store.put(9, FixedBytes::new([9u8; 32])).await.unwrap();
1867
1868 for i in 10..15 {
1870 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1871 }
1872
1873 store.close().await.unwrap();
1874 }
1875
1876 {
1878 let mut bits_map: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1879
1880 let none_option: Option<BitVec> = None;
1882 bits_map.insert(0, &none_option);
1883
1884 let mut bitvec1 = BitVec::zeroes(5);
1886 bitvec1.set(0); bitvec1.set(2); let bitvec1_option = Some(bitvec1);
1890 bits_map.insert(1, &bitvec1_option);
1891
1892 let store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1895 context.clone(),
1896 cfg.clone(),
1897 Some(bits_map),
1898 )
1899 .await
1900 .expect("Failed to initialize store with bits");
1901
1902 for i in 0..5 {
1904 assert!(store.has(i));
1905 assert_eq!(
1906 store.get(i).await.unwrap().unwrap(),
1907 FixedBytes::new([i as u8; 32])
1908 );
1909 }
1910
1911 assert!(store.has(5));
1913 assert!(store.has(7));
1914 assert!(!store.has(6));
1915 assert!(!store.has(8));
1916 assert!(!store.has(9)); for i in 10..15 {
1920 assert!(!store.has(i));
1921 }
1922 }
1923 });
1924 }
1925
1926 #[test_traced]
1927 #[should_panic(expected = "Failed to initialize store with bits: MissingRecord(2)")]
1928 fn test_init_with_bits_corrupted_records() {
1929 let executor = deterministic::Runner::default();
1931 executor.start(|context| async move {
1932 let cfg = Config {
1933 partition: "test_ordinal".into(),
1934 items_per_blob: NZU64!(5),
1935 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1936 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
1937 };
1938
1939 {
1941 let mut store = Ordinal::<_, FixedBytes<32>>::init(context.clone(), cfg.clone())
1942 .await
1943 .expect("Failed to initialize store");
1944
1945 for i in 0..5 {
1947 store.put(i, FixedBytes::new([i as u8; 32])).await.unwrap();
1948 }
1949
1950 store.close().await.unwrap();
1951 }
1952
1953 {
1955 let (blob, _) = context
1956 .open("test_ordinal", &0u64.to_be_bytes())
1957 .await
1958 .unwrap();
1959 let offset = 2 * 36 + 32; blob.write_at(vec![0xFF], offset).await.unwrap();
1962 blob.sync().await.unwrap();
1963 }
1964
1965 {
1967 let mut bits_map: BTreeMap<u64, &Option<BitVec>> = BTreeMap::new();
1968
1969 let mut bitvec = BitVec::zeroes(5);
1971 bitvec.set(0); bitvec.set(2); bitvec.set(4); let bitvec_option = Some(bitvec);
1975 bits_map.insert(0, &bitvec_option);
1976
1977 let _store = Ordinal::<_, FixedBytes<32>>::init_with_bits(
1978 context.clone(),
1979 cfg.clone(),
1980 Some(bits_map),
1981 )
1982 .await
1983 .expect("Failed to initialize store with bits");
1984 }
1985 });
1986 }
1987
1988 #[derive(Debug, PartialEq, Eq)]
1990 pub struct DummyValue {
1991 pub value: u64,
1992 }
1993
1994 impl Write for DummyValue {
1995 fn write(&self, buf: &mut impl BufMut) {
1996 self.value.write(buf);
1997 }
1998 }
1999
2000 impl Read for DummyValue {
2001 type Cfg = ();
2002
2003 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
2004 let value = u64::read(buf)?;
2005 if value == 0 {
2006 return Err(commonware_codec::Error::Invalid(
2007 "DummyValue",
2008 "value must be non-zero",
2009 ));
2010 }
2011 Ok(Self { value })
2012 }
2013 }
2014
2015 impl FixedSize for DummyValue {
2016 const SIZE: usize = u64::SIZE;
2017 }
2018
2019 #[test_traced]
2020 fn test_init_skip_unparseable_record() {
2021 let executor = deterministic::Runner::default();
2023 executor.start(|context| async move {
2024 let cfg = Config {
2025 partition: "test_ordinal".into(),
2026 items_per_blob: NZU64!(1),
2027 write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
2028 replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER),
2029 };
2030
2031 {
2033 let mut store = Ordinal::<_, DummyValue>::init(context.clone(), cfg.clone())
2034 .await
2035 .expect("Failed to initialize store");
2036
2037 store.put(1, DummyValue { value: 1 }).await.unwrap();
2039 store.put(2, DummyValue { value: 0 }).await.unwrap(); store.put(4, DummyValue { value: 4 }).await.unwrap();
2041
2042 store.close().await.unwrap();
2043 }
2044
2045 {
2047 let store = Ordinal::<_, DummyValue>::init(context.clone(), cfg.clone())
2048 .await
2049 .expect("Failed to initialize store");
2050
2051 assert!(store.has(1), "Record 1 should be available");
2053 assert_eq!(
2054 store.get(1).await.unwrap().unwrap(),
2055 DummyValue { value: 1 },
2056 "Record 0 should have correct value"
2057 );
2058
2059 assert!(
2061 !store.has(2),
2062 "Record 2 should not be available (unparseable)"
2063 );
2064
2065 assert!(
2067 store.has(4),
2068 "Record 4 should be available - we should not exit early on unparseable record"
2069 );
2070 assert_eq!(
2071 store.get(4).await.unwrap().unwrap(),
2072 DummyValue { value: 4 },
2073 "Record 4 should have correct value"
2074 );
2075 }
2076 });
2077 }
2078}