1mod storage;
69pub use storage::Metadata;
70use thiserror::Error;
71
72#[derive(Debug, Error)]
74pub enum Error {
75 #[error("runtime error: {0}")]
76 Runtime(#[from] commonware_runtime::Error),
77 #[error("blob too large: {0}")]
78 BlobTooLarge(u64),
79}
80
81#[derive(Clone)]
83pub struct Config<C> {
84 pub partition: String,
86
87 pub codec_config: C,
89}
90
91#[cfg(test)]
92mod tests {
93 use super::*;
94 use commonware_macros::test_traced;
95 use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
96 use commonware_utils::sequence::U64;
97 use rand::{Rng, RngCore};
98
99 #[test_traced]
100 fn test_put_get_clear() {
101 let executor = deterministic::Runner::default();
103 executor.start(|context| async move {
104 let cfg = Config {
106 partition: "test".to_string(),
107 codec_config: ((0..).into(), ()),
108 };
109 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
110 .await
111 .unwrap();
112
113 let key = U64::new(42);
115 let value = metadata.get(&key);
116 assert!(value.is_none());
117
118 let buffer = context.encode();
120 assert!(buffer.contains("sync_rewrites_total 0"));
121 assert!(buffer.contains("sync_overwrites_total 0"));
122 assert!(buffer.contains("keys 0"));
123
124 let hello = b"hello".to_vec();
126 metadata.put(key.clone(), hello.clone());
127
128 let value = metadata.get(&key).unwrap();
130 assert_eq!(value, &hello);
131
132 let buffer = context.encode();
134 assert!(buffer.contains("sync_rewrites_total 0"));
135 assert!(buffer.contains("sync_overwrites_total 0"));
136 assert!(buffer.contains("keys 1"));
137
138 metadata.close().await.unwrap();
140
141 let buffer = context.encode();
143 assert!(buffer.contains("sync_rewrites_total 1"));
144 assert!(buffer.contains("sync_overwrites_total 0"));
145 assert!(buffer.contains("keys 1"));
146
147 let cfg = Config {
149 partition: "test".to_string(),
150 codec_config: ((0..).into(), ()),
151 };
152 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
153 .await
154 .unwrap();
155
156 let buffer = context.encode();
158 assert!(buffer.contains("sync_rewrites_total 0"));
159 assert!(buffer.contains("sync_overwrites_total 0"));
160 assert!(buffer.contains("keys 1"));
161
162 let value = metadata.get(&key).unwrap();
164 assert_eq!(value, &hello);
165
166 metadata.clear();
168 let value = metadata.get(&key);
169 assert!(value.is_none());
170
171 let buffer = context.encode();
173 assert!(buffer.contains("sync_rewrites_total 1"));
174 assert!(buffer.contains("sync_overwrites_total 0"));
175 assert!(buffer.contains("keys 0"));
176
177 metadata.destroy().await.unwrap();
178 });
179 }
180
181 #[test_traced]
182 fn test_multi_sync() {
183 let executor = deterministic::Runner::default();
185 executor.start(|context| async move {
186 let cfg = Config {
188 partition: "test".to_string(),
189 codec_config: ((0..).into(), ()),
190 };
191 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
192 .await
193 .unwrap();
194
195 let key = U64::new(42);
197 let hello = b"hello".to_vec();
198 metadata.put(key.clone(), hello.clone());
199
200 metadata.sync().await.unwrap();
202
203 let buffer = context.encode();
205 assert!(buffer.contains("sync_rewrites_total 1"));
206 assert!(buffer.contains("sync_overwrites_total 0"));
207 assert!(buffer.contains("keys 1"));
208
209 let world = b"world".to_vec();
211 metadata.put(key.clone(), world.clone());
212 let key2 = U64::new(43);
213 let foo = b"foo".to_vec();
214 metadata.put(key2.clone(), foo.clone());
215
216 metadata.close().await.unwrap();
218
219 let buffer = context.encode();
221 assert!(buffer.contains("sync_rewrites_total 2"));
222 assert!(buffer.contains("sync_overwrites_total 0"));
223 assert!(buffer.contains("keys 2"));
224
225 let cfg = Config {
227 partition: "test".to_string(),
228 codec_config: ((0..).into(), ()),
229 };
230 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
231 .await
232 .unwrap();
233
234 let buffer = context.encode();
236 assert!(buffer.contains("sync_rewrites_total 0"));
237 assert!(buffer.contains("sync_overwrites_total 0"));
238 assert!(buffer.contains("keys 2"));
239
240 let value = metadata.get(&key).unwrap();
242 assert_eq!(value, &world);
243 let value = metadata.get(&key2).unwrap();
244 assert_eq!(value, &foo);
245
246 metadata.remove(&key);
248
249 metadata.sync().await.unwrap();
251
252 let buffer = context.encode();
254 assert!(buffer.contains("sync_rewrites_total 1"));
255 assert!(buffer.contains("sync_overwrites_total 0"));
256 assert!(buffer.contains("keys 1"));
257
258 metadata.close().await.unwrap();
260
261 let cfg = Config {
263 partition: "test".to_string(),
264 codec_config: ((0..).into(), ()),
265 };
266 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
267 .await
268 .unwrap();
269
270 let buffer = context.encode();
272 assert!(buffer.contains("sync_rewrites_total 0"));
273 assert!(buffer.contains("sync_overwrites_total 0"));
274 assert!(buffer.contains("keys 1"));
275
276 let value = metadata.get(&key);
278 assert!(value.is_none());
279 let value = metadata.get(&key2).unwrap();
280 assert_eq!(value, &foo);
281
282 metadata.destroy().await.unwrap();
283 });
284 }
285
286 #[test_traced]
287 fn test_recover_corrupted_one() {
288 let executor = deterministic::Runner::default();
290 executor.start(|context| async move {
291 let cfg = Config {
293 partition: "test".to_string(),
294 codec_config: ((0..).into(), ()),
295 };
296 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
297 .await
298 .unwrap();
299
300 let key = U64::new(42);
302 let hello = b"hello".to_vec();
303 metadata.put(key.clone(), hello.clone());
304
305 metadata.sync().await.unwrap();
307
308 let world = b"world".to_vec();
310 metadata.put(key.clone(), world.clone());
311 let key2 = U64::new(43);
312 let foo = b"foo".to_vec();
313 metadata.put(key2, foo.clone());
314
315 metadata.close().await.unwrap();
317
318 let (blob, _) = context.open("test", b"left").await.unwrap();
320 blob.write_at(b"corrupted".to_vec(), 0).await.unwrap();
321 blob.sync().await.unwrap();
322
323 let cfg = Config {
325 partition: "test".to_string(),
326 codec_config: ((0..).into(), ()),
327 };
328 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
329 .await
330 .unwrap();
331
332 let value = metadata.get(&key).unwrap();
334 assert_eq!(value, &hello);
335
336 metadata.destroy().await.unwrap();
337 });
338 }
339
340 #[test_traced]
341 fn test_recover_corrupted_both() {
342 let executor = deterministic::Runner::default();
344 executor.start(|context| async move {
345 let cfg = Config {
347 partition: "test".to_string(),
348 codec_config: ((0..).into(), ()),
349 };
350 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
351 .await
352 .unwrap();
353
354 let key = U64::new(42);
356 let hello = b"hello".to_vec();
357 metadata.put(key.clone(), hello.clone());
358
359 metadata.sync().await.unwrap();
361
362 let world = b"world".to_vec();
364 metadata.put(key.clone(), world.clone());
365 let key2 = U64::new(43);
366 let foo = b"foo".to_vec();
367 metadata.put(key2, foo.clone());
368
369 metadata.close().await.unwrap();
371
372 let (blob, _) = context.open("test", b"left").await.unwrap();
374 blob.write_at(b"corrupted".to_vec(), 0).await.unwrap();
375 blob.sync().await.unwrap();
376 let (blob, _) = context.open("test", b"right").await.unwrap();
377 blob.write_at(b"corrupted".to_vec(), 0).await.unwrap();
378 blob.sync().await.unwrap();
379
380 let cfg = Config {
382 partition: "test".to_string(),
383 codec_config: ((0..).into(), ()),
384 };
385 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
386 .await
387 .unwrap();
388
389 let value = metadata.get(&key);
391 assert!(value.is_none());
392
393 let buffer = context.encode();
395 assert!(buffer.contains("sync_rewrites_total 0"));
396 assert!(buffer.contains("sync_overwrites_total 0"));
397 assert!(buffer.contains("keys 0"));
398
399 metadata.destroy().await.unwrap();
400 });
401 }
402
403 #[test_traced]
404 fn test_recover_corrupted_truncate() {
405 let executor = deterministic::Runner::default();
407 executor.start(|context| async move {
408 let cfg = Config {
410 partition: "test".to_string(),
411 codec_config: ((0..).into(), ()),
412 };
413 let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
414
415 let key = U64::new(42);
417 let hello = b"hello".to_vec();
418 metadata.put(key.clone(), hello.clone());
419
420 metadata.sync().await.unwrap();
422
423 let world = b"world".to_vec();
425 metadata.put(key.clone(), world.clone());
426 let key2 = U64::new(43);
427 let foo = b"foo".to_vec();
428 metadata.put(key2, foo.clone());
429
430 metadata.close().await.unwrap();
432
433 let (blob, len) = context.open("test", b"left").await.unwrap();
435 blob.resize(len - 8).await.unwrap();
436 blob.sync().await.unwrap();
437
438 let cfg = Config {
440 partition: "test".to_string(),
441 codec_config: ((0..).into(), ()),
442 };
443 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
444 .await
445 .unwrap();
446
447 let value = metadata.get(&key).unwrap();
449 assert_eq!(value, &hello);
450
451 metadata.destroy().await.unwrap();
452 });
453 }
454
455 #[test_traced]
456 fn test_recover_corrupted_short() {
457 let executor = deterministic::Runner::default();
459 executor.start(|context| async move {
460 let cfg = Config {
462 partition: "test".to_string(),
463 codec_config: ((0..).into(), ()),
464 };
465 let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
466
467 let key = U64::new(42);
469 let hello = b"hello".to_vec();
470 metadata.put(key.clone(), hello.clone());
471
472 metadata.sync().await.unwrap();
474
475 let world = b"world".to_vec();
477 metadata.put(key.clone(), world.clone());
478 let key2 = U64::new(43);
479 let foo = b"foo".to_vec();
480 metadata.put(key2, foo.clone());
481
482 metadata.close().await.unwrap();
484
485 let (blob, _) = context.open("test", b"left").await.unwrap();
487 blob.resize(5).await.unwrap();
488 blob.sync().await.unwrap();
489
490 let cfg = Config {
492 partition: "test".to_string(),
493 codec_config: ((0..).into(), ()),
494 };
495 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
496 .await
497 .unwrap();
498
499 let value = metadata.get(&key).unwrap();
501 assert_eq!(value, &hello);
502
503 metadata.destroy().await.unwrap();
504 });
505 }
506
507 #[test_traced]
508 fn test_unclean_shutdown() {
509 let executor = deterministic::Runner::default();
511 executor.start(|context| async move {
512 let key = U64::new(42);
513 let hello = b"hello".to_vec();
514 {
515 let cfg = Config {
517 partition: "test".to_string(),
518 codec_config: ((0..).into(), ()),
519 };
520 let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
521
522 metadata.put(key.clone(), hello.clone());
524
525 }
527
528 let cfg = Config {
530 partition: "test".to_string(),
531 codec_config: ((0..).into(), ()),
532 };
533 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
534 .await
535 .unwrap();
536
537 let value = metadata.get(&key);
539 assert!(value.is_none());
540
541 let buffer = context.encode();
543 assert!(buffer.contains("sync_rewrites_total 0"));
544 assert!(buffer.contains("sync_overwrites_total 0"));
545 assert!(buffer.contains("keys 0"));
546
547 metadata.destroy().await.unwrap();
548 });
549 }
550
551 #[test_traced]
552 #[should_panic(expected = "usize value is larger than u32")]
553 fn test_value_too_big_error() {
554 let executor = deterministic::Runner::default();
556 executor.start(|context| async move {
557 let cfg = Config {
559 partition: "test".to_string(),
560 codec_config: ((0..).into(), ()),
561 };
562 let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
563
564 let value = vec![0u8; (u32::MAX as usize) + 1];
566 metadata.put(U64::new(1), value);
567
568 metadata.sync().await.unwrap();
570 });
571 }
572
573 #[test_traced]
574 fn test_delta_writes() {
575 let executor = deterministic::Runner::default();
577 executor.start(|context| async move {
578 let cfg = Config {
580 partition: "test".to_string(),
581 codec_config: ((0..).into(), ()),
582 };
583 let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
584
585 for i in 0..100 {
587 metadata.put(U64::new(i), vec![i as u8; 100]);
588 }
589
590 metadata.sync().await.unwrap();
594 let buffer = context.encode();
595 assert!(buffer.contains("sync_rewrites_total 1"), "{buffer}");
596 assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
597 assert!(
598 buffer.contains("runtime_storage_write_bytes_total 10912"),
599 "{buffer}",
600 );
601
602 metadata.put(U64::new(51), vec![0xff; 100]);
604
605 metadata.sync().await.unwrap();
607 let buffer = context.encode();
608 assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
609 assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
610 assert!(
611 buffer.contains("runtime_storage_write_bytes_total 21824"),
612 "{buffer}",
613 );
614
615 metadata.sync().await.unwrap();
619 let buffer = context.encode();
620 assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
621 assert!(buffer.contains("sync_overwrites_total 1"), "{buffer}");
622 assert!(
623 buffer.contains("runtime_storage_write_bytes_total 21937"),
624 "{buffer}",
625 );
626
627 metadata.sync().await.unwrap();
631 let buffer = context.encode();
632 assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
633 assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
634 assert!(
635 buffer.contains("runtime_storage_write_bytes_total 21949"),
636 "{buffer}",
637 );
638
639 metadata.remove(&U64::new(51));
643 metadata.sync().await.unwrap();
644 let buffer = context.encode();
645 assert!(buffer.contains("sync_rewrites_total 3"), "{buffer}");
646 assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
647 assert!(
648 buffer.contains("runtime_storage_write_bytes_total 32752"),
649 "{buffer}"
650 );
651
652 metadata.sync().await.unwrap();
654 let buffer = context.encode();
655 assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
656 assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
657 assert!(
658 buffer.contains("runtime_storage_write_bytes_total 43555"),
659 "{buffer}"
660 );
661
662 metadata.put(U64::new(50), vec![0xff; 100]);
666 metadata.sync().await.unwrap();
667 let buffer = context.encode();
668 assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
669 assert!(buffer.contains("sync_overwrites_total 3"), "{buffer}");
670 assert!(
671 buffer.contains("runtime_storage_write_bytes_total 43668"),
672 "{buffer}"
673 );
674
675 metadata.destroy().await.unwrap();
677 });
678 }
679
680 #[test_traced]
681 fn test_sync_with_no_changes() {
682 let executor = deterministic::Runner::default();
683 executor.start(|context| async move {
684 let cfg = Config {
685 partition: "test".to_string(),
686 codec_config: ((0..).into(), ()),
687 };
688 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
689 .await
690 .unwrap();
691
692 metadata.put(U64::new(1), b"hello".to_vec());
694 metadata.sync().await.unwrap();
695
696 metadata.sync().await.unwrap();
699 let buffer = context.encode();
700 assert!(buffer.contains("sync_rewrites_total 2"));
701 assert!(buffer.contains("sync_overwrites_total 0"));
702
703 metadata.sync().await.unwrap();
705 let buffer = context.encode();
706 assert!(buffer.contains("sync_rewrites_total 2"));
707 assert!(buffer.contains("sync_overwrites_total 1"));
708
709 metadata.sync().await.unwrap();
711 let buffer = context.encode();
712 assert!(buffer.contains("sync_rewrites_total 2"));
713 assert!(buffer.contains("sync_overwrites_total 2"));
714
715 metadata.destroy().await.unwrap();
716 });
717 }
718
719 #[test_traced]
720 fn test_get_mut_marks_modified() {
721 let executor = deterministic::Runner::default();
722 executor.start(|context| async move {
723 let cfg = Config {
724 partition: "test".to_string(),
725 codec_config: ((0..).into(), ()),
726 };
727 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
728 .await
729 .unwrap();
730
731 metadata.put(U64::new(1), b"hello".to_vec());
733 metadata.sync().await.unwrap();
734
735 metadata.sync().await.unwrap();
737
738 let value = metadata.get_mut(&U64::new(1)).unwrap();
740 value[0] = b'H';
741
742 metadata.sync().await.unwrap();
744 let buffer = context.encode();
745 assert!(buffer.contains("sync_rewrites_total 2"));
746 assert!(buffer.contains("sync_overwrites_total 1"));
747
748 metadata.close().await.unwrap();
750 let metadata = Metadata::<_, U64, Vec<u8>>::init(context, cfg)
751 .await
752 .unwrap();
753
754 let value = metadata.get(&U64::new(1)).unwrap();
756 assert_eq!(value[0], b'H');
757
758 metadata.destroy().await.unwrap();
759 });
760 }
761
762 #[test_traced]
763 fn test_mixed_operation_sequences() {
764 let executor = deterministic::Runner::default();
765 executor.start(|context| async move {
766 let cfg = Config {
767 partition: "test".to_string(),
768 codec_config: ((0..).into(), ()),
769 };
770 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
771 .await
772 .unwrap();
773
774 let key = U64::new(1);
775
776 metadata.put(key.clone(), b"first".to_vec());
778 metadata.remove(&key);
779 metadata.put(key.clone(), b"second".to_vec());
780 metadata.sync().await.unwrap();
781 let value = metadata.get(&key).unwrap();
782 assert_eq!(value, b"second");
783
784 metadata.put(key.clone(), b"third".to_vec());
786 let value = metadata.get_mut(&key).unwrap();
787 value[0] = b'T';
788 metadata.remove(&key);
789 metadata.put(key.clone(), b"fourth".to_vec());
790 metadata.sync().await.unwrap();
791 let value = metadata.get(&key).unwrap();
792 assert_eq!(value, b"fourth");
793
794 metadata.close().await.unwrap();
796 let metadata = Metadata::<_, U64, Vec<u8>>::init(context, cfg)
797 .await
798 .unwrap();
799
800 let value = metadata.get(&key).unwrap();
802 assert_eq!(value, b"fourth");
803
804 metadata.destroy().await.unwrap();
805 });
806 }
807
808 #[test_traced]
809 fn test_overwrite_vs_rewrite() {
810 let executor = deterministic::Runner::default();
811 executor.start(|context| async move {
812 let cfg = Config {
813 partition: "test".to_string(),
814 codec_config: ((0..).into(), ()),
815 };
816 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
817 .await
818 .unwrap();
819
820 metadata.put(U64::new(1), vec![1; 10]);
822 metadata.put(U64::new(2), vec![2; 10]);
823 metadata.sync().await.unwrap();
824
825 metadata.put(U64::new(1), vec![0xFF; 10]);
827 metadata.sync().await.unwrap();
828 let buffer = context.encode();
829 assert!(buffer.contains("sync_rewrites_total 2"));
830 assert!(buffer.contains("sync_overwrites_total 0"));
831
832 metadata.sync().await.unwrap();
834 let buffer = context.encode();
835 assert!(buffer.contains("sync_rewrites_total 2"));
836 assert!(buffer.contains("sync_overwrites_total 1"));
837
838 metadata.put(U64::new(1), vec![0xAA; 10]);
840 metadata.sync().await.unwrap();
841 let buffer = context.encode();
842 assert!(buffer.contains("sync_rewrites_total 2"));
843 assert!(buffer.contains("sync_overwrites_total 2"));
844
845 metadata.put(U64::new(1), vec![0xFF; 20]);
847 metadata.sync().await.unwrap();
848 let buffer = context.encode();
849 assert!(buffer.contains("sync_rewrites_total 3"));
850 assert!(buffer.contains("sync_overwrites_total 2"));
851
852 metadata.put(U64::new(3), vec![3; 10]);
854 metadata.sync().await.unwrap();
855 let buffer = context.encode();
856 assert!(buffer.contains("sync_rewrites_total 4"));
857 assert!(buffer.contains("sync_overwrites_total 2"));
858
859 metadata.sync().await.unwrap();
861 let buffer = context.encode();
862 assert!(buffer.contains("sync_rewrites_total 5"));
863 assert!(buffer.contains("sync_overwrites_total 2"));
864
865 metadata.put(U64::new(2), vec![0xAA; 10]);
867 metadata.sync().await.unwrap();
868 let buffer = context.encode();
869 assert!(buffer.contains("sync_rewrites_total 5"));
870 assert!(buffer.contains("sync_overwrites_total 3"));
871
872 metadata.destroy().await.unwrap();
873 });
874 }
875
876 #[test_traced]
877 fn test_blob_resize() {
878 let executor = deterministic::Runner::default();
879 executor.start(|context| async move {
880 let cfg = Config {
881 partition: "test".to_string(),
882 codec_config: ((0..).into(), ()),
883 };
884 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
885 .await
886 .unwrap();
887
888 for i in 0..10 {
890 metadata.put(U64::new(i), vec![i as u8; 100]);
891 }
892 metadata.sync().await.unwrap();
893
894 metadata.sync().await.unwrap();
896 let buffer = context.encode();
897 assert!(buffer.contains("sync_rewrites_total 2"));
898 assert!(buffer.contains("sync_overwrites_total 0"));
899
900 for i in 1..10 {
902 metadata.remove(&U64::new(i));
903 }
904 metadata.sync().await.unwrap();
905
906 let value = metadata.get(&U64::new(0)).unwrap();
908 assert_eq!(value.len(), 100);
909 assert_eq!(value[0], 0);
910
911 let buffer = context.encode();
913 assert!(buffer.contains("sync_rewrites_total 3"));
914 assert!(buffer.contains("sync_overwrites_total 0"));
915
916 metadata.close().await.unwrap();
918 let metadata = Metadata::<_, U64, Vec<u8>>::init(context, cfg)
919 .await
920 .unwrap();
921
922 let value = metadata.get(&U64::new(0)).unwrap();
924 assert_eq!(value.len(), 100);
925 assert_eq!(value[0], 0);
926
927 for i in 1..10 {
929 assert!(metadata.get(&U64::new(i)).is_none());
930 }
931
932 metadata.destroy().await.unwrap();
933 });
934 }
935
936 #[test_traced]
937 fn test_clear_and_repopulate() {
938 let executor = deterministic::Runner::default();
939 executor.start(|context| async move {
940 let cfg = Config {
941 partition: "test".to_string(),
942 codec_config: ((0..).into(), ()),
943 };
944 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
945 .await
946 .unwrap();
947
948 metadata.put(U64::new(1), b"first".to_vec());
950 metadata.put(U64::new(2), b"second".to_vec());
951 metadata.sync().await.unwrap();
952
953 metadata.clear();
955 metadata.sync().await.unwrap();
956
957 assert!(metadata.get(&U64::new(1)).is_none());
959 assert!(metadata.get(&U64::new(2)).is_none());
960
961 metadata.close().await.unwrap();
963 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context, cfg)
964 .await
965 .unwrap();
966
967 assert!(metadata.get(&U64::new(1)).is_none());
969 assert!(metadata.get(&U64::new(2)).is_none());
970
971 metadata.put(U64::new(3), b"third".to_vec());
973 metadata.put(U64::new(4), b"fourth".to_vec());
974 metadata.sync().await.unwrap();
975
976 assert_eq!(metadata.get(&U64::new(3)).unwrap(), b"third");
978 assert_eq!(metadata.get(&U64::new(4)).unwrap(), b"fourth");
979 assert!(metadata.get(&U64::new(1)).is_none());
980 assert!(metadata.get(&U64::new(2)).is_none());
981
982 metadata.destroy().await.unwrap();
983 });
984 }
985
986 fn test_metadata_operations_and_restart(num_operations: usize) -> String {
987 let executor = deterministic::Runner::default();
988 executor.start(|mut context| async move {
989 let cfg = Config {
990 partition: "test_determinism".to_string(),
991 codec_config: ((0..).into(), ()),
992 };
993 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
994 .await
995 .unwrap();
996
997 for i in 0..num_operations {
999 let key = U64::new(i as u64);
1000 let mut value = vec![0u8; 64];
1001 context.fill_bytes(&mut value);
1002 metadata.put(key, value);
1003
1004 if context.gen_bool(0.1) {
1006 metadata.sync().await.unwrap();
1007 }
1008
1009 if context.gen_bool(0.1) {
1011 let selected_index = context.gen_range(0..=i);
1012 let update_key = U64::new(selected_index as u64);
1013 let mut new_value = vec![0u8; 64];
1014 context.fill_bytes(&mut new_value);
1015 metadata.put(update_key, new_value);
1016 }
1017
1018 if context.gen_bool(0.1) {
1020 let selected_index = context.gen_range(0..=i);
1021 let remove_key = U64::new(selected_index as u64);
1022 metadata.remove(&remove_key);
1023 }
1024
1025 if context.gen_bool(0.1) {
1027 let selected_index = context.gen_range(0..=i);
1028 let mut_key = U64::new(selected_index as u64);
1029 if let Some(value) = metadata.get_mut(&mut_key) {
1030 if !value.is_empty() {
1031 value[0] = value[0].wrapping_add(1);
1032 }
1033 }
1034 }
1035 }
1036 metadata.sync().await.unwrap();
1037
1038 metadata.destroy().await.unwrap();
1040
1041 context.auditor().state()
1042 })
1043 }
1044
1045 #[test_traced]
1046 #[ignore]
1047 fn test_determinism() {
1048 let state1 = test_metadata_operations_and_restart(1_000);
1049 let state2 = test_metadata_operations_and_restart(1_000);
1050 assert_eq!(state1, state2);
1051 }
1052
1053 #[test_traced]
1054 fn test_keys_iterator() {
1055 let executor = deterministic::Runner::default();
1057 executor.start(|context| async move {
1058 let cfg = Config {
1060 partition: "test".to_string(),
1061 codec_config: ((0..).into(), ()),
1062 };
1063 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
1064 .await
1065 .unwrap();
1066
1067 metadata.put(U64::new(0x1000), b"value1".to_vec());
1069 metadata.put(U64::new(0x1001), b"value2".to_vec());
1070 metadata.put(U64::new(0x1002), b"value3".to_vec());
1071 metadata.put(U64::new(0x2000), b"value4".to_vec());
1072 metadata.put(U64::new(0x2001), b"value5".to_vec());
1073 metadata.put(U64::new(0x3000), b"value6".to_vec());
1074
1075 let all_keys: Vec<_> = metadata.keys(None).cloned().collect();
1077 assert_eq!(all_keys.len(), 6);
1078 assert!(all_keys.contains(&U64::new(0x1000)));
1079 assert!(all_keys.contains(&U64::new(0x3000)));
1080
1081 let prefix = vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10];
1083 let prefix_keys: Vec<_> = metadata.keys(Some(&prefix)).cloned().collect();
1084 assert_eq!(prefix_keys.len(), 3);
1085 assert!(prefix_keys.contains(&U64::new(0x1000)));
1086 assert!(prefix_keys.contains(&U64::new(0x1001)));
1087 assert!(prefix_keys.contains(&U64::new(0x1002)));
1088 assert!(!prefix_keys.contains(&U64::new(0x2000)));
1089
1090 let prefix = vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x20];
1092 let prefix_keys: Vec<_> = metadata.keys(Some(&prefix)).cloned().collect();
1093 assert_eq!(prefix_keys.len(), 2);
1094 assert!(prefix_keys.contains(&U64::new(0x2000)));
1095 assert!(prefix_keys.contains(&U64::new(0x2001)));
1096
1097 let prefix = vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40];
1099 let prefix_keys: Vec<_> = metadata.keys(Some(&prefix)).cloned().collect();
1100 assert_eq!(prefix_keys.len(), 0);
1101
1102 metadata.destroy().await.unwrap();
1103 });
1104 }
1105
1106 #[test_traced]
1107 fn test_remove_prefix() {
1108 let executor = deterministic::Runner::default();
1110 executor.start(|context| async move {
1111 let cfg = Config {
1113 partition: "test".to_string(),
1114 codec_config: ((0..).into(), ()),
1115 };
1116 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
1117 .await
1118 .unwrap();
1119
1120 metadata.put(U64::new(0x1000), b"value1".to_vec());
1122 metadata.put(U64::new(0x1001), b"value2".to_vec());
1123 metadata.put(U64::new(0x1002), b"value3".to_vec());
1124 metadata.put(U64::new(0x2000), b"value4".to_vec());
1125 metadata.put(U64::new(0x2001), b"value5".to_vec());
1126 metadata.put(U64::new(0x3000), b"value6".to_vec());
1127
1128 let buffer = context.encode();
1130 assert!(buffer.contains("keys 6"));
1131
1132 let prefix = vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10];
1134 metadata.remove_prefix(&prefix);
1135
1136 let buffer = context.encode();
1138 assert!(buffer.contains("keys 3"));
1139
1140 assert!(metadata.get(&U64::new(0x1000)).is_none());
1142 assert!(metadata.get(&U64::new(0x1001)).is_none());
1143 assert!(metadata.get(&U64::new(0x1002)).is_none());
1144 assert!(metadata.get(&U64::new(0x2000)).is_some());
1145 assert!(metadata.get(&U64::new(0x2001)).is_some());
1146 assert!(metadata.get(&U64::new(0x3000)).is_some());
1147
1148 metadata.sync().await.unwrap();
1150 metadata.close().await.unwrap();
1151 let cfg = Config {
1152 partition: "test".to_string(),
1153 codec_config: ((0..).into(), ()),
1154 };
1155 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
1156 .await
1157 .unwrap();
1158
1159 assert!(metadata.get(&U64::new(0x1000)).is_none());
1161 assert!(metadata.get(&U64::new(0x2000)).is_some());
1162 assert_eq!(metadata.keys(None).count(), 3);
1163
1164 let prefix = vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40];
1166 metadata.remove_prefix(&prefix);
1167
1168 let prefix = vec![]; metadata.remove_prefix(&prefix);
1171 assert_eq!(metadata.keys(None).count(), 0);
1172
1173 metadata.destroy().await.unwrap();
1174 });
1175 }
1176}