1mod storage;
67pub use storage::Metadata;
68use thiserror::Error;
69
70#[derive(Debug, Error)]
72pub enum Error {
73 #[error("runtime error: {0}")]
74 Runtime(#[from] commonware_runtime::Error),
75 #[error("blob too large: {0}")]
76 BlobTooLarge(u64),
77}
78
79#[derive(Clone)]
81pub struct Config<C> {
82 pub partition: String,
84
85 pub codec_config: C,
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use commonware_macros::{test_group, test_traced};
93 use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
94 use commonware_utils::{hex, sequence::U64};
95 use rand::{Rng, RngCore};
96
97 #[test_traced]
98 fn test_put_get_clear() {
99 let executor = deterministic::Runner::default();
101 executor.start(|context| async move {
102 let cfg = Config {
104 partition: "test".to_string(),
105 codec_config: ((0..).into(), ()),
106 };
107 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
108 .await
109 .unwrap();
110
111 let key = U64::new(42);
113 let value = metadata.get(&key);
114 assert!(value.is_none());
115
116 let buffer = context.encode();
118 assert!(buffer.contains("sync_rewrites_total 0"));
119 assert!(buffer.contains("sync_overwrites_total 0"));
120 assert!(buffer.contains("keys 0"));
121
122 let hello = b"hello".to_vec();
124 metadata.put(key.clone(), hello.clone());
125
126 let value = metadata.get(&key).unwrap();
128 assert_eq!(value, &hello);
129
130 let buffer = context.encode();
132 assert!(buffer.contains("sync_rewrites_total 0"));
133 assert!(buffer.contains("sync_overwrites_total 0"));
134 assert!(buffer.contains("keys 1"));
135
136 metadata.sync().await.unwrap();
138
139 let buffer = context.encode();
141 assert!(buffer.contains("sync_rewrites_total 1"));
142 assert!(buffer.contains("sync_overwrites_total 0"));
143 assert!(buffer.contains("keys 1"));
144
145 drop(metadata);
147 let cfg = Config {
148 partition: "test".to_string(),
149 codec_config: ((0..).into(), ()),
150 };
151 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
152 .await
153 .unwrap();
154
155 let buffer = context.encode();
157 assert!(buffer.contains("sync_rewrites_total 0"));
158 assert!(buffer.contains("sync_overwrites_total 0"));
159 assert!(buffer.contains("keys 1"));
160
161 let value = metadata.get(&key).unwrap();
163 assert_eq!(value, &hello);
164
165 metadata.clear();
167 let value = metadata.get(&key);
168 assert!(value.is_none());
169
170 let buffer = context.encode();
172 assert!(buffer.contains("sync_rewrites_total 1"));
173 assert!(buffer.contains("sync_overwrites_total 0"));
174 assert!(buffer.contains("keys 0"));
175
176 metadata.destroy().await.unwrap();
177 });
178 }
179
180 #[test_traced]
181 fn test_put_returns_previous_value() {
182 let executor = deterministic::Runner::default();
183 executor.start(|context| async move {
184 let cfg = Config {
185 partition: "test".to_string(),
186 codec_config: ((0..).into(), ()),
187 };
188 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
189 .await
190 .unwrap();
191
192 let key = U64::new(42);
193
194 let previous = metadata.put(key.clone(), b"first".to_vec());
196 assert!(previous.is_none());
197
198 let previous = metadata.put(key.clone(), b"second".to_vec());
200 assert_eq!(previous, Some(b"first".to_vec()));
201
202 let previous = metadata.put(key.clone(), b"third".to_vec());
204 assert_eq!(previous, Some(b"second".to_vec()));
205
206 assert_eq!(metadata.get(&key), Some(&b"third".to_vec()));
208
209 let other_key = U64::new(99);
211 let previous = metadata.put(other_key.clone(), b"other".to_vec());
212 assert!(previous.is_none());
213
214 metadata.sync().await.unwrap();
216 drop(metadata);
217
218 let cfg = Config {
219 partition: "test".to_string(),
220 codec_config: ((0..).into(), ()),
221 };
222 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
223 .await
224 .unwrap();
225
226 let previous = metadata.put(key.clone(), b"fourth".to_vec());
228 assert_eq!(previous, Some(b"third".to_vec()));
229
230 metadata.destroy().await.unwrap();
231 });
232 }
233
234 #[test_traced]
235 fn test_multi_sync() {
236 let executor = deterministic::Runner::default();
238 executor.start(|context| async move {
239 let cfg = Config {
241 partition: "test".to_string(),
242 codec_config: ((0..).into(), ()),
243 };
244 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
245 .await
246 .unwrap();
247
248 let key = U64::new(42);
250 let hello = b"hello".to_vec();
251 metadata.put(key.clone(), hello.clone());
252
253 metadata.sync().await.unwrap();
255
256 let buffer = context.encode();
258 assert!(buffer.contains("sync_rewrites_total 1"));
259 assert!(buffer.contains("sync_overwrites_total 0"));
260 assert!(buffer.contains("keys 1"));
261
262 let world = b"world".to_vec();
264 metadata.put(key.clone(), world.clone());
265 let key2 = U64::new(43);
266 let foo = b"foo".to_vec();
267 metadata.put(key2.clone(), foo.clone());
268
269 metadata.sync().await.unwrap();
271
272 let buffer = context.encode();
274 assert!(buffer.contains("sync_rewrites_total 2"));
275 assert!(buffer.contains("sync_overwrites_total 0"));
276 assert!(buffer.contains("keys 2"));
277
278 drop(metadata);
280 let cfg = Config {
281 partition: "test".to_string(),
282 codec_config: ((0..).into(), ()),
283 };
284 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
285 .await
286 .unwrap();
287
288 let buffer = context.encode();
290 assert!(buffer.contains("sync_rewrites_total 0"));
291 assert!(buffer.contains("sync_overwrites_total 0"));
292 assert!(buffer.contains("keys 2"));
293
294 let value = metadata.get(&key).unwrap();
296 assert_eq!(value, &world);
297 let value = metadata.get(&key2).unwrap();
298 assert_eq!(value, &foo);
299
300 metadata.remove(&key);
302
303 metadata.sync().await.unwrap();
305
306 let buffer = context.encode();
308 assert!(buffer.contains("sync_rewrites_total 1"));
309 assert!(buffer.contains("sync_overwrites_total 0"));
310 assert!(buffer.contains("keys 1"));
311
312 drop(metadata);
314 let cfg = Config {
315 partition: "test".to_string(),
316 codec_config: ((0..).into(), ()),
317 };
318 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
319 .await
320 .unwrap();
321
322 let buffer = context.encode();
324 assert!(buffer.contains("sync_rewrites_total 0"));
325 assert!(buffer.contains("sync_overwrites_total 0"));
326 assert!(buffer.contains("keys 1"));
327
328 let value = metadata.get(&key);
330 assert!(value.is_none());
331 let value = metadata.get(&key2).unwrap();
332 assert_eq!(value, &foo);
333
334 metadata.destroy().await.unwrap();
335 });
336 }
337
338 #[test_traced]
339 fn test_recover_corrupted_one() {
340 let executor = deterministic::Runner::default();
342 executor.start(|context| async move {
343 let cfg = Config {
345 partition: "test".to_string(),
346 codec_config: ((0..).into(), ()),
347 };
348 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
349 .await
350 .unwrap();
351
352 let key = U64::new(42);
354 let hello = b"hello".to_vec();
355 metadata.put(key.clone(), hello.clone());
356
357 metadata.sync().await.unwrap();
359
360 let world = b"world".to_vec();
362 metadata.put(key.clone(), world.clone());
363 let key2 = U64::new(43);
364 let foo = b"foo".to_vec();
365 metadata.put(key2, foo.clone());
366
367 metadata.sync().await.unwrap();
369 drop(metadata);
370
371 let (blob, _) = context.open("test", b"left").await.unwrap();
373 blob.write_at(b"corrupted".to_vec(), 0).await.unwrap();
374 blob.sync().await.unwrap();
375
376 let cfg = Config {
378 partition: "test".to_string(),
379 codec_config: ((0..).into(), ()),
380 };
381 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
382 .await
383 .unwrap();
384
385 let value = metadata.get(&key).unwrap();
387 assert_eq!(value, &hello);
388
389 metadata.destroy().await.unwrap();
390 });
391 }
392
393 #[test_traced]
394 fn test_recover_corrupted_both() {
395 let executor = deterministic::Runner::default();
397 executor.start(|context| async move {
398 let cfg = Config {
400 partition: "test".to_string(),
401 codec_config: ((0..).into(), ()),
402 };
403 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
404 .await
405 .unwrap();
406
407 let key = U64::new(42);
409 let hello = b"hello".to_vec();
410 metadata.put(key.clone(), hello.clone());
411
412 metadata.sync().await.unwrap();
414
415 let world = b"world".to_vec();
417 metadata.put(key.clone(), world.clone());
418 let key2 = U64::new(43);
419 let foo = b"foo".to_vec();
420 metadata.put(key2, foo.clone());
421
422 metadata.sync().await.unwrap();
424 drop(metadata);
425
426 let (blob, _) = context.open("test", b"left").await.unwrap();
428 blob.write_at(b"corrupted".to_vec(), 0).await.unwrap();
429 blob.sync().await.unwrap();
430 let (blob, _) = context.open("test", b"right").await.unwrap();
431 blob.write_at(b"corrupted".to_vec(), 0).await.unwrap();
432 blob.sync().await.unwrap();
433
434 let cfg = Config {
436 partition: "test".to_string(),
437 codec_config: ((0..).into(), ()),
438 };
439 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
440 .await
441 .unwrap();
442
443 let value = metadata.get(&key);
445 assert!(value.is_none());
446
447 let buffer = context.encode();
449 assert!(buffer.contains("sync_rewrites_total 0"));
450 assert!(buffer.contains("sync_overwrites_total 0"));
451 assert!(buffer.contains("keys 0"));
452
453 metadata.destroy().await.unwrap();
454 });
455 }
456
457 #[test_traced]
458 fn test_recover_corrupted_truncate() {
459 let executor = deterministic::Runner::default();
461 executor.start(|context| async move {
462 let cfg = Config {
464 partition: "test".to_string(),
465 codec_config: ((0..).into(), ()),
466 };
467 let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
468
469 let key = U64::new(42);
471 let hello = b"hello".to_vec();
472 metadata.put(key.clone(), hello.clone());
473
474 metadata.sync().await.unwrap();
476
477 let world = b"world".to_vec();
479 metadata.put(key.clone(), world.clone());
480 let key2 = U64::new(43);
481 let foo = b"foo".to_vec();
482 metadata.put(key2, foo.clone());
483
484 metadata.sync().await.unwrap();
486 drop(metadata);
487
488 let (blob, len) = context.open("test", b"left").await.unwrap();
490 blob.resize(len - 8).await.unwrap();
491 blob.sync().await.unwrap();
492
493 let cfg = Config {
495 partition: "test".to_string(),
496 codec_config: ((0..).into(), ()),
497 };
498 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
499 .await
500 .unwrap();
501
502 let value = metadata.get(&key).unwrap();
504 assert_eq!(value, &hello);
505
506 metadata.destroy().await.unwrap();
507 });
508 }
509
510 #[test_traced]
511 fn test_recover_corrupted_short() {
512 let executor = deterministic::Runner::default();
514 executor.start(|context| async move {
515 let cfg = Config {
517 partition: "test".to_string(),
518 codec_config: ((0..).into(), ()),
519 };
520 let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
521
522 let key = U64::new(42);
524 let hello = b"hello".to_vec();
525 metadata.put(key.clone(), hello.clone());
526
527 metadata.sync().await.unwrap();
529
530 let world = b"world".to_vec();
532 metadata.put(key.clone(), world.clone());
533 let key2 = U64::new(43);
534 let foo = b"foo".to_vec();
535 metadata.put(key2, foo.clone());
536
537 metadata.sync().await.unwrap();
539 drop(metadata);
540
541 let (blob, _) = context.open("test", b"left").await.unwrap();
543 blob.resize(5).await.unwrap();
544 blob.sync().await.unwrap();
545
546 let cfg = Config {
548 partition: "test".to_string(),
549 codec_config: ((0..).into(), ()),
550 };
551 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
552 .await
553 .unwrap();
554
555 let value = metadata.get(&key).unwrap();
557 assert_eq!(value, &hello);
558
559 metadata.destroy().await.unwrap();
560 });
561 }
562
563 #[test_traced]
564 fn test_unclean_shutdown() {
565 let executor = deterministic::Runner::default();
567 executor.start(|context| async move {
568 let key = U64::new(42);
569 let hello = b"hello".to_vec();
570 {
571 let cfg = Config {
573 partition: "test".to_string(),
574 codec_config: ((0..).into(), ()),
575 };
576 let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
577
578 metadata.put(key.clone(), hello.clone());
580
581 }
583
584 let cfg = Config {
586 partition: "test".to_string(),
587 codec_config: ((0..).into(), ()),
588 };
589 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
590 .await
591 .unwrap();
592
593 let value = metadata.get(&key);
595 assert!(value.is_none());
596
597 let buffer = context.encode();
599 assert!(buffer.contains("sync_rewrites_total 0"));
600 assert!(buffer.contains("sync_overwrites_total 0"));
601 assert!(buffer.contains("keys 0"));
602
603 metadata.destroy().await.unwrap();
604 });
605 }
606
607 #[test_traced]
608 #[should_panic(expected = "usize value is larger than u32")]
609 fn test_value_too_big_error() {
610 let executor = deterministic::Runner::default();
612 executor.start(|context| async move {
613 let cfg = Config {
615 partition: "test".to_string(),
616 codec_config: ((0..).into(), ()),
617 };
618 let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
619
620 let value = vec![0u8; (u32::MAX as usize) + 1];
622 metadata.put(U64::new(1), value);
623
624 metadata.sync().await.unwrap();
626 });
627 }
628
629 #[test_traced]
630 fn test_delta_writes() {
631 let executor = deterministic::Runner::default();
633 executor.start(|context| async move {
634 let cfg = Config {
636 partition: "test".to_string(),
637 codec_config: ((0..).into(), ()),
638 };
639 let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
640
641 for i in 0..100 {
643 metadata.put(U64::new(i), vec![i as u8; 100]);
644 }
645
646 metadata.sync().await.unwrap();
650 let buffer = context.encode();
651 assert!(buffer.contains("sync_rewrites_total 1"), "{buffer}");
652 assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
653 assert!(
654 buffer.contains("runtime_storage_write_bytes_total 10912"),
655 "{buffer}",
656 );
657
658 metadata.put(U64::new(51), vec![0xff; 100]);
660
661 metadata.sync().await.unwrap();
663 let buffer = context.encode();
664 assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
665 assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
666 assert!(
667 buffer.contains("runtime_storage_write_bytes_total 21824"),
668 "{buffer}",
669 );
670
671 metadata.sync().await.unwrap();
675 let buffer = context.encode();
676 assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
677 assert!(buffer.contains("sync_overwrites_total 1"), "{buffer}");
678 assert!(
679 buffer.contains("runtime_storage_write_bytes_total 21937"),
680 "{buffer}",
681 );
682
683 metadata.sync().await.unwrap();
687 let buffer = context.encode();
688 assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
689 assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
690 assert!(
691 buffer.contains("runtime_storage_write_bytes_total 21949"),
692 "{buffer}",
693 );
694
695 metadata.remove(&U64::new(51));
699 metadata.sync().await.unwrap();
700 let buffer = context.encode();
701 assert!(buffer.contains("sync_rewrites_total 3"), "{buffer}");
702 assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
703 assert!(
704 buffer.contains("runtime_storage_write_bytes_total 32752"),
705 "{buffer}"
706 );
707
708 metadata.sync().await.unwrap();
710 let buffer = context.encode();
711 assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
712 assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
713 assert!(
714 buffer.contains("runtime_storage_write_bytes_total 43555"),
715 "{buffer}"
716 );
717
718 metadata.put(U64::new(50), vec![0xff; 100]);
722 metadata.sync().await.unwrap();
723 let buffer = context.encode();
724 assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
725 assert!(buffer.contains("sync_overwrites_total 3"), "{buffer}");
726 assert!(
727 buffer.contains("runtime_storage_write_bytes_total 43668"),
728 "{buffer}"
729 );
730
731 metadata.destroy().await.unwrap();
733 });
734 }
735
736 #[test_traced]
737 fn test_sync_with_no_changes() {
738 let executor = deterministic::Runner::default();
739 executor.start(|context| async move {
740 let cfg = Config {
741 partition: "test".to_string(),
742 codec_config: ((0..).into(), ()),
743 };
744 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
745 .await
746 .unwrap();
747
748 metadata.put(U64::new(1), b"hello".to_vec());
750 metadata.sync().await.unwrap();
751
752 metadata.sync().await.unwrap();
755 let buffer = context.encode();
756 assert!(buffer.contains("sync_rewrites_total 2"));
757 assert!(buffer.contains("sync_overwrites_total 0"));
758
759 metadata.sync().await.unwrap();
761 let buffer = context.encode();
762 assert!(buffer.contains("sync_rewrites_total 2"));
763 assert!(buffer.contains("sync_overwrites_total 1"));
764
765 metadata.sync().await.unwrap();
767 let buffer = context.encode();
768 assert!(buffer.contains("sync_rewrites_total 2"));
769 assert!(buffer.contains("sync_overwrites_total 2"));
770
771 metadata.destroy().await.unwrap();
772 });
773 }
774
775 #[test_traced]
776 fn test_get_mut_marks_modified() {
777 let executor = deterministic::Runner::default();
778 executor.start(|context| async move {
779 let cfg = Config {
780 partition: "test".to_string(),
781 codec_config: ((0..).into(), ()),
782 };
783 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
784 .await
785 .unwrap();
786
787 metadata.put(U64::new(1), b"hello".to_vec());
789 metadata.sync().await.unwrap();
790
791 metadata.sync().await.unwrap();
793
794 let value = metadata.get_mut(&U64::new(1)).unwrap();
796 value[0] = b'H';
797
798 metadata.sync().await.unwrap();
800 let buffer = context.encode();
801 assert!(buffer.contains("sync_rewrites_total 2"));
802 assert!(buffer.contains("sync_overwrites_total 1"));
803
804 drop(metadata);
806 let metadata = Metadata::<_, U64, Vec<u8>>::init(context, cfg)
807 .await
808 .unwrap();
809
810 let value = metadata.get(&U64::new(1)).unwrap();
812 assert_eq!(value[0], b'H');
813
814 metadata.destroy().await.unwrap();
815 });
816 }
817
818 #[test_traced]
819 fn test_mixed_operation_sequences() {
820 let executor = deterministic::Runner::default();
821 executor.start(|context| async move {
822 let cfg = Config {
823 partition: "test".to_string(),
824 codec_config: ((0..).into(), ()),
825 };
826 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
827 .await
828 .unwrap();
829
830 let key = U64::new(1);
831
832 metadata.put(key.clone(), b"first".to_vec());
834 metadata.remove(&key);
835 metadata.put(key.clone(), b"second".to_vec());
836 metadata.sync().await.unwrap();
837 let value = metadata.get(&key).unwrap();
838 assert_eq!(value, b"second");
839
840 metadata.put(key.clone(), b"third".to_vec());
842 let value = metadata.get_mut(&key).unwrap();
843 value[0] = b'T';
844 metadata.remove(&key);
845 metadata.put(key.clone(), b"fourth".to_vec());
846 metadata.sync().await.unwrap();
847 let value = metadata.get(&key).unwrap();
848 assert_eq!(value, b"fourth");
849
850 drop(metadata);
852 let metadata = Metadata::<_, U64, Vec<u8>>::init(context, cfg)
853 .await
854 .unwrap();
855
856 let value = metadata.get(&key).unwrap();
858 assert_eq!(value, b"fourth");
859
860 metadata.destroy().await.unwrap();
861 });
862 }
863
864 #[test_traced]
865 fn test_overwrite_vs_rewrite() {
866 let executor = deterministic::Runner::default();
867 executor.start(|context| async move {
868 let cfg = Config {
869 partition: "test".to_string(),
870 codec_config: ((0..).into(), ()),
871 };
872 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
873 .await
874 .unwrap();
875
876 metadata.put(U64::new(1), vec![1; 10]);
878 metadata.put(U64::new(2), vec![2; 10]);
879 metadata.sync().await.unwrap();
880
881 metadata.put(U64::new(1), vec![0xFF; 10]);
883 metadata.sync().await.unwrap();
884 let buffer = context.encode();
885 assert!(buffer.contains("sync_rewrites_total 2"));
886 assert!(buffer.contains("sync_overwrites_total 0"));
887
888 metadata.sync().await.unwrap();
890 let buffer = context.encode();
891 assert!(buffer.contains("sync_rewrites_total 2"));
892 assert!(buffer.contains("sync_overwrites_total 1"));
893
894 metadata.put(U64::new(1), vec![0xAA; 10]);
896 metadata.sync().await.unwrap();
897 let buffer = context.encode();
898 assert!(buffer.contains("sync_rewrites_total 2"));
899 assert!(buffer.contains("sync_overwrites_total 2"));
900
901 metadata.put(U64::new(1), vec![0xFF; 20]);
903 metadata.sync().await.unwrap();
904 let buffer = context.encode();
905 assert!(buffer.contains("sync_rewrites_total 3"));
906 assert!(buffer.contains("sync_overwrites_total 2"));
907
908 metadata.put(U64::new(3), vec![3; 10]);
910 metadata.sync().await.unwrap();
911 let buffer = context.encode();
912 assert!(buffer.contains("sync_rewrites_total 4"));
913 assert!(buffer.contains("sync_overwrites_total 2"));
914
915 metadata.sync().await.unwrap();
917 let buffer = context.encode();
918 assert!(buffer.contains("sync_rewrites_total 5"));
919 assert!(buffer.contains("sync_overwrites_total 2"));
920
921 metadata.put(U64::new(2), vec![0xAA; 10]);
923 metadata.sync().await.unwrap();
924 let buffer = context.encode();
925 assert!(buffer.contains("sync_rewrites_total 5"));
926 assert!(buffer.contains("sync_overwrites_total 3"));
927
928 metadata.destroy().await.unwrap();
929 });
930 }
931
932 #[test_traced]
933 fn test_blob_resize() {
934 let executor = deterministic::Runner::default();
935 executor.start(|context| async move {
936 let cfg = Config {
937 partition: "test".to_string(),
938 codec_config: ((0..).into(), ()),
939 };
940 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
941 .await
942 .unwrap();
943
944 for i in 0..10 {
946 metadata.put(U64::new(i), vec![i as u8; 100]);
947 }
948 metadata.sync().await.unwrap();
949
950 metadata.sync().await.unwrap();
952 let buffer = context.encode();
953 assert!(buffer.contains("sync_rewrites_total 2"));
954 assert!(buffer.contains("sync_overwrites_total 0"));
955
956 for i in 1..10 {
958 metadata.remove(&U64::new(i));
959 }
960 metadata.sync().await.unwrap();
961
962 let value = metadata.get(&U64::new(0)).unwrap();
964 assert_eq!(value.len(), 100);
965 assert_eq!(value[0], 0);
966
967 let buffer = context.encode();
969 assert!(buffer.contains("sync_rewrites_total 3"));
970 assert!(buffer.contains("sync_overwrites_total 0"));
971
972 drop(metadata);
974 let metadata = Metadata::<_, U64, Vec<u8>>::init(context, cfg)
975 .await
976 .unwrap();
977
978 let value = metadata.get(&U64::new(0)).unwrap();
980 assert_eq!(value.len(), 100);
981 assert_eq!(value[0], 0);
982
983 for i in 1..10 {
985 assert!(metadata.get(&U64::new(i)).is_none());
986 }
987
988 metadata.destroy().await.unwrap();
989 });
990 }
991
992 #[test_traced]
993 fn test_clear_and_repopulate() {
994 let executor = deterministic::Runner::default();
995 executor.start(|context| async move {
996 let cfg = Config {
997 partition: "test".to_string(),
998 codec_config: ((0..).into(), ()),
999 };
1000 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
1001 .await
1002 .unwrap();
1003
1004 metadata.put(U64::new(1), b"first".to_vec());
1006 metadata.put(U64::new(2), b"second".to_vec());
1007 metadata.sync().await.unwrap();
1008
1009 metadata.clear();
1011 metadata.sync().await.unwrap();
1012
1013 assert!(metadata.get(&U64::new(1)).is_none());
1015 assert!(metadata.get(&U64::new(2)).is_none());
1016
1017 drop(metadata);
1019 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context, cfg)
1020 .await
1021 .unwrap();
1022
1023 assert!(metadata.get(&U64::new(1)).is_none());
1025 assert!(metadata.get(&U64::new(2)).is_none());
1026
1027 metadata.put(U64::new(3), b"third".to_vec());
1029 metadata.put(U64::new(4), b"fourth".to_vec());
1030 metadata.sync().await.unwrap();
1031
1032 assert_eq!(metadata.get(&U64::new(3)).unwrap(), b"third");
1034 assert_eq!(metadata.get(&U64::new(4)).unwrap(), b"fourth");
1035 assert!(metadata.get(&U64::new(1)).is_none());
1036 assert!(metadata.get(&U64::new(2)).is_none());
1037
1038 metadata.destroy().await.unwrap();
1039 });
1040 }
1041
1042 fn test_metadata_operations_and_restart(num_operations: usize) -> String {
1043 let executor = deterministic::Runner::default();
1044 executor.start(|mut context| async move {
1045 let cfg = Config {
1046 partition: "test_determinism".to_string(),
1047 codec_config: ((0..).into(), ()),
1048 };
1049 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
1050 .await
1051 .unwrap();
1052
1053 for i in 0..num_operations {
1055 let key = U64::new(i as u64);
1056 let mut value = vec![0u8; 64];
1057 context.fill_bytes(&mut value);
1058 metadata.put(key, value);
1059
1060 if context.gen_bool(0.1) {
1062 metadata.sync().await.unwrap();
1063 }
1064
1065 if context.gen_bool(0.1) {
1067 let selected_index = context.gen_range(0..=i);
1068 let update_key = U64::new(selected_index as u64);
1069 let mut new_value = vec![0u8; 64];
1070 context.fill_bytes(&mut new_value);
1071 metadata.put(update_key, new_value);
1072 }
1073
1074 if context.gen_bool(0.1) {
1076 let selected_index = context.gen_range(0..=i);
1077 let remove_key = U64::new(selected_index as u64);
1078 metadata.remove(&remove_key);
1079 }
1080
1081 if context.gen_bool(0.1) {
1083 let selected_index = context.gen_range(0..=i);
1084 let mut_key = U64::new(selected_index as u64);
1085 if let Some(value) = metadata.get_mut(&mut_key) {
1086 if !value.is_empty() {
1087 value[0] = value[0].wrapping_add(1);
1088 }
1089 }
1090 }
1091 }
1092 metadata.sync().await.unwrap();
1093
1094 metadata.destroy().await.unwrap();
1096
1097 context.auditor().state()
1098 })
1099 }
1100
1101 #[test_group("slow")]
1102 #[test_traced]
1103 fn test_determinism() {
1104 let state1 = test_metadata_operations_and_restart(1_000);
1105 let state2 = test_metadata_operations_and_restart(1_000);
1106 assert_eq!(state1, state2);
1107 }
1108
1109 #[test_traced]
1110 fn test_keys_iterator() {
1111 let executor = deterministic::Runner::default();
1113 executor.start(|context| async move {
1114 let cfg = Config {
1116 partition: "test".to_string(),
1117 codec_config: ((0..).into(), ()),
1118 };
1119 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
1120 .await
1121 .unwrap();
1122
1123 metadata.put(U64::new(0x1000), b"value1".to_vec());
1125 metadata.put(U64::new(0x1001), b"value2".to_vec());
1126 metadata.put(U64::new(0x1002), b"value3".to_vec());
1127 metadata.put(U64::new(0x2000), b"value4".to_vec());
1128 metadata.put(U64::new(0x2001), b"value5".to_vec());
1129 metadata.put(U64::new(0x3000), b"value6".to_vec());
1130
1131 let all_keys: Vec<_> = metadata.keys().cloned().collect();
1133 assert_eq!(all_keys.len(), 6);
1134 assert!(all_keys.contains(&U64::new(0x1000)));
1135 assert!(all_keys.contains(&U64::new(0x3000)));
1136
1137 let prefix = hex!("0x00000000000010");
1139 let prefix_keys: Vec<_> = metadata
1140 .keys()
1141 .filter(|k| k.as_ref().starts_with(&prefix))
1142 .cloned()
1143 .collect();
1144 assert_eq!(prefix_keys.len(), 3);
1145 assert!(prefix_keys.contains(&U64::new(0x1000)));
1146 assert!(prefix_keys.contains(&U64::new(0x1001)));
1147 assert!(prefix_keys.contains(&U64::new(0x1002)));
1148 assert!(!prefix_keys.contains(&U64::new(0x2000)));
1149
1150 let prefix = hex!("0x00000000000020");
1152 let prefix_keys: Vec<_> = metadata
1153 .keys()
1154 .filter(|k| k.as_ref().starts_with(&prefix))
1155 .cloned()
1156 .collect();
1157 assert_eq!(prefix_keys.len(), 2);
1158 assert!(prefix_keys.contains(&U64::new(0x2000)));
1159 assert!(prefix_keys.contains(&U64::new(0x2001)));
1160
1161 let prefix = hex!("0x00000000000040");
1163 let prefix_keys: Vec<_> = metadata
1164 .keys()
1165 .filter(|k| k.as_ref().starts_with(&prefix))
1166 .cloned()
1167 .collect();
1168 assert_eq!(prefix_keys.len(), 0);
1169
1170 metadata.destroy().await.unwrap();
1171 });
1172 }
1173
1174 #[test_traced]
1175 fn test_retain() {
1176 let executor = deterministic::Runner::default();
1178 executor.start(|context| async move {
1179 let cfg = Config {
1181 partition: "test".to_string(),
1182 codec_config: ((0..).into(), ()),
1183 };
1184 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
1185 .await
1186 .unwrap();
1187
1188 metadata.put(U64::new(0x1000), b"value1".to_vec());
1190 metadata.put(U64::new(0x1001), b"value2".to_vec());
1191 metadata.put(U64::new(0x1002), b"value3".to_vec());
1192 metadata.put(U64::new(0x2000), b"value4".to_vec());
1193 metadata.put(U64::new(0x2001), b"value5".to_vec());
1194 metadata.put(U64::new(0x3000), b"value6".to_vec());
1195
1196 let buffer = context.encode();
1198 assert!(buffer.contains("keys 6"));
1199
1200 let prefix = hex!("0x00000000000010");
1202 metadata.retain(|k, _| !k.as_ref().starts_with(&prefix));
1203
1204 let buffer = context.encode();
1206 assert!(buffer.contains("keys 3"));
1207
1208 assert!(metadata.get(&U64::new(0x1000)).is_none());
1210 assert!(metadata.get(&U64::new(0x1001)).is_none());
1211 assert!(metadata.get(&U64::new(0x1002)).is_none());
1212 assert!(metadata.get(&U64::new(0x2000)).is_some());
1213 assert!(metadata.get(&U64::new(0x2001)).is_some());
1214 assert!(metadata.get(&U64::new(0x3000)).is_some());
1215
1216 metadata.sync().await.unwrap();
1218 drop(metadata);
1219 let cfg = Config {
1220 partition: "test".to_string(),
1221 codec_config: ((0..).into(), ()),
1222 };
1223 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
1224 .await
1225 .unwrap();
1226
1227 assert!(metadata.get(&U64::new(0x1000)).is_none());
1229 assert!(metadata.get(&U64::new(0x2000)).is_some());
1230 assert_eq!(metadata.keys().count(), 3);
1231
1232 let prefix = hex!("0x00000000000040");
1234 metadata.retain(|k, _| !k.as_ref().starts_with(&prefix));
1235
1236 metadata.retain(|_, _| false);
1238 assert_eq!(metadata.keys().count(), 0);
1239
1240 metadata.destroy().await.unwrap();
1241 });
1242 }
1243}