1mod storage;
67pub use storage::Metadata;
68use thiserror::Error;
69
70#[derive(Debug, Error)]
72pub enum Error {
73 #[error("runtime error: {0}")]
74 Runtime(#[from] commonware_runtime::Error),
75}
76
77#[derive(Clone)]
79pub struct Config<C> {
80 pub partition: String,
82
83 pub codec_config: C,
85}
86
87#[cfg(test)]
88mod tests {
89 use super::*;
90 use commonware_formatting::hex;
91 use commonware_macros::{test_group, test_traced};
92 use commonware_runtime::{deterministic, Blob, Metrics as _, Runner, Storage, Supervisor as _};
93 use commonware_utils::sequence::U64;
94 use rand::{Rng, RngCore};
95
96 #[test_traced]
97 fn test_put_get_clear() {
98 let executor = deterministic::Runner::default();
100 executor.start(|context| async move {
101 let cfg = Config {
103 partition: "test".into(),
104 codec_config: ((0..).into(), ()),
105 };
106 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg)
107 .await
108 .unwrap();
109
110 let key = U64::new(42);
112 let value = metadata.get(&key);
113 assert!(value.is_none());
114
115 let buffer = context.encode();
117 assert!(buffer.contains("first_sync_rewrites_total 0"));
118 assert!(buffer.contains("first_sync_overwrites_total 0"));
119 assert!(buffer.contains("first_keys 0"));
120
121 let hello = b"hello".to_vec();
123 metadata.put(key.clone(), hello.clone());
124
125 let value = metadata.get(&key).unwrap();
127 assert_eq!(value, &hello);
128
129 let buffer = context.encode();
131 assert!(buffer.contains("first_sync_rewrites_total 0"));
132 assert!(buffer.contains("first_sync_overwrites_total 0"));
133 assert!(buffer.contains("first_keys 1"));
134
135 metadata.sync().await.unwrap();
137
138 let buffer = context.encode();
140 assert!(buffer.contains("first_sync_rewrites_total 1"));
141 assert!(buffer.contains("first_sync_overwrites_total 0"));
142 assert!(buffer.contains("first_keys 1"));
143
144 drop(metadata);
146 let cfg = Config {
147 partition: "test".into(),
148 codec_config: ((0..).into(), ()),
149 };
150 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
151 .await
152 .unwrap();
153
154 let buffer = context.encode();
156 assert!(buffer.contains("second_sync_rewrites_total 0"));
157 assert!(buffer.contains("second_sync_overwrites_total 0"));
158 assert!(buffer.contains("second_keys 1"));
159
160 let value = metadata.get(&key).unwrap();
162 assert_eq!(value, &hello);
163
164 metadata.clear();
166 let value = metadata.get(&key);
167 assert!(value.is_none());
168
169 let buffer = context.encode();
171 assert!(buffer.contains("second_sync_rewrites_total 0"));
172 assert!(buffer.contains("second_sync_overwrites_total 0"));
173 assert!(buffer.contains("second_keys 0"));
174
175 metadata.destroy().await.unwrap();
176 });
177 }
178
179 #[test_traced]
180 fn test_put_returns_previous_value() {
181 let executor = deterministic::Runner::default();
182 executor.start(|context| async move {
183 let cfg = Config {
184 partition: "test".into(),
185 codec_config: ((0..).into(), ()),
186 };
187 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg)
188 .await
189 .unwrap();
190
191 let key = U64::new(42);
192
193 let previous = metadata.put(key.clone(), b"first".to_vec());
195 assert!(previous.is_none());
196
197 let previous = metadata.put(key.clone(), b"second".to_vec());
199 assert_eq!(previous, Some(b"first".to_vec()));
200
201 let previous = metadata.put(key.clone(), b"third".to_vec());
203 assert_eq!(previous, Some(b"second".to_vec()));
204
205 assert_eq!(metadata.get(&key), Some(&b"third".to_vec()));
207
208 let other_key = U64::new(99);
210 let previous = metadata.put(other_key.clone(), b"other".to_vec());
211 assert!(previous.is_none());
212
213 metadata.sync().await.unwrap();
215 drop(metadata);
216
217 let cfg = Config {
218 partition: "test".into(),
219 codec_config: ((0..).into(), ()),
220 };
221 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
222 .await
223 .unwrap();
224
225 let previous = metadata.put(key.clone(), b"fourth".to_vec());
227 assert_eq!(previous, Some(b"third".to_vec()));
228
229 metadata.destroy().await.unwrap();
230 });
231 }
232
233 #[test_traced]
234 fn test_multi_sync() {
235 let executor = deterministic::Runner::default();
237 executor.start(|context| async move {
238 let cfg = Config {
240 partition: "test".into(),
241 codec_config: ((0..).into(), ()),
242 };
243 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg)
244 .await
245 .unwrap();
246
247 let key = U64::new(42);
249 let hello = b"hello".to_vec();
250 metadata.put(key.clone(), hello.clone());
251
252 metadata.sync().await.unwrap();
254
255 let buffer = context.encode();
257 assert!(buffer.contains("first_sync_rewrites_total 1"));
258 assert!(buffer.contains("first_sync_overwrites_total 0"));
259 assert!(buffer.contains("first_keys 1"));
260
261 let world = b"world".to_vec();
263 metadata.put(key.clone(), world.clone());
264 let key2 = U64::new(43);
265 let foo = b"foo".to_vec();
266 metadata.put(key2.clone(), foo.clone());
267
268 metadata.sync().await.unwrap();
270
271 let buffer = context.encode();
273 assert!(buffer.contains("first_sync_rewrites_total 2"));
274 assert!(buffer.contains("first_sync_overwrites_total 0"));
275 assert!(buffer.contains("first_keys 2"));
276
277 drop(metadata);
279 let cfg = Config {
280 partition: "test".into(),
281 codec_config: ((0..).into(), ()),
282 };
283 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
284 .await
285 .unwrap();
286
287 let buffer = context.encode();
289 assert!(buffer.contains("second_sync_rewrites_total 0"));
290 assert!(buffer.contains("second_sync_overwrites_total 0"));
291 assert!(buffer.contains("second_keys 2"));
292
293 let value = metadata.get(&key).unwrap();
295 assert_eq!(value, &world);
296 let value = metadata.get(&key2).unwrap();
297 assert_eq!(value, &foo);
298
299 metadata.remove(&key);
301
302 metadata.sync().await.unwrap();
304
305 let buffer = context.encode();
307 assert!(buffer.contains("second_sync_rewrites_total 1"));
308 assert!(buffer.contains("second_sync_overwrites_total 0"));
309 assert!(buffer.contains("second_keys 1"));
310
311 drop(metadata);
313 let cfg = Config {
314 partition: "test".into(),
315 codec_config: ((0..).into(), ()),
316 };
317 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("third"), cfg)
318 .await
319 .unwrap();
320
321 let buffer = context.encode();
323 assert!(buffer.contains("third_sync_rewrites_total 0"));
324 assert!(buffer.contains("third_sync_overwrites_total 0"));
325 assert!(buffer.contains("third_keys 1"));
326
327 let value = metadata.get(&key);
329 assert!(value.is_none());
330 let value = metadata.get(&key2).unwrap();
331 assert_eq!(value, &foo);
332
333 metadata.destroy().await.unwrap();
334 });
335 }
336
337 #[test_traced]
338 fn test_recover_corrupted_one() {
339 let executor = deterministic::Runner::default();
341 executor.start(|context| async move {
342 let cfg = Config {
344 partition: "test".into(),
345 codec_config: ((0..).into(), ()),
346 };
347 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg)
348 .await
349 .unwrap();
350
351 let key = U64::new(42);
353 let hello = b"hello".to_vec();
354 metadata.put(key.clone(), hello.clone());
355
356 metadata.sync().await.unwrap();
358
359 let world = b"world".to_vec();
361 metadata.put(key.clone(), world.clone());
362 let key2 = U64::new(43);
363 let foo = b"foo".to_vec();
364 metadata.put(key2, foo.clone());
365
366 metadata.sync().await.unwrap();
368 drop(metadata);
369
370 let (blob, _) = context.open("test", b"left").await.unwrap();
372 blob.write_at_sync(0, b"corrupted".to_vec()).await.unwrap();
373
374 let cfg = Config {
376 partition: "test".into(),
377 codec_config: ((0..).into(), ()),
378 };
379 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
380 .await
381 .unwrap();
382
383 let value = metadata.get(&key).unwrap();
385 assert_eq!(value, &hello);
386
387 metadata.destroy().await.unwrap();
388 });
389 }
390
391 #[test_traced]
392 fn test_recover_corrupted_both() {
393 let executor = deterministic::Runner::default();
395 executor.start(|context| async move {
396 let cfg = Config {
398 partition: "test".into(),
399 codec_config: ((0..).into(), ()),
400 };
401 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg)
402 .await
403 .unwrap();
404
405 let key = U64::new(42);
407 let hello = b"hello".to_vec();
408 metadata.put(key.clone(), hello.clone());
409
410 metadata.sync().await.unwrap();
412
413 let world = b"world".to_vec();
415 metadata.put(key.clone(), world.clone());
416 let key2 = U64::new(43);
417 let foo = b"foo".to_vec();
418 metadata.put(key2, foo.clone());
419
420 metadata.sync().await.unwrap();
422 drop(metadata);
423
424 let (blob, _) = context.open("test", b"left").await.unwrap();
426 blob.write_at_sync(0, b"corrupted".to_vec()).await.unwrap();
427 let (blob, _) = context.open("test", b"right").await.unwrap();
428 blob.write_at_sync(0, b"corrupted".to_vec()).await.unwrap();
429
430 let cfg = Config {
432 partition: "test".into(),
433 codec_config: ((0..).into(), ()),
434 };
435 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
436 .await
437 .unwrap();
438
439 let value = metadata.get(&key);
441 assert!(value.is_none());
442
443 let buffer = context.encode();
445 assert!(buffer.contains("second_sync_rewrites_total 0"));
446 assert!(buffer.contains("second_sync_overwrites_total 0"));
447 assert!(buffer.contains("second_keys 0"));
448
449 metadata.destroy().await.unwrap();
450 });
451 }
452
453 #[test_traced]
454 fn test_recover_corrupted_truncate() {
455 let executor = deterministic::Runner::default();
457 executor.start(|context| async move {
458 let cfg = Config {
460 partition: "test".into(),
461 codec_config: ((0..).into(), ()),
462 };
463 let mut metadata = Metadata::init(context.child("first"), cfg).await.unwrap();
464
465 let key = U64::new(42);
467 let hello = b"hello".to_vec();
468 metadata.put(key.clone(), hello.clone());
469
470 metadata.sync().await.unwrap();
472
473 let world = b"world".to_vec();
475 metadata.put(key.clone(), world.clone());
476 let key2 = U64::new(43);
477 let foo = b"foo".to_vec();
478 metadata.put(key2, foo.clone());
479
480 metadata.sync().await.unwrap();
482 drop(metadata);
483
484 let (blob, len) = context.open("test", b"left").await.unwrap();
486 blob.resize(len - 8).await.unwrap();
487 blob.sync().await.unwrap();
488
489 let cfg = Config {
491 partition: "test".into(),
492 codec_config: ((0..).into(), ()),
493 };
494 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
495 .await
496 .unwrap();
497
498 let value = metadata.get(&key).unwrap();
500 assert_eq!(value, &hello);
501
502 metadata.destroy().await.unwrap();
503 });
504 }
505
506 #[test_traced]
507 fn test_recover_corrupted_short() {
508 let executor = deterministic::Runner::default();
510 executor.start(|context| async move {
511 let cfg = Config {
513 partition: "test".into(),
514 codec_config: ((0..).into(), ()),
515 };
516 let mut metadata = Metadata::init(context.child("first"), cfg).await.unwrap();
517
518 let key = U64::new(42);
520 let hello = b"hello".to_vec();
521 metadata.put(key.clone(), hello.clone());
522
523 metadata.sync().await.unwrap();
525
526 let world = b"world".to_vec();
528 metadata.put(key.clone(), world.clone());
529 let key2 = U64::new(43);
530 let foo = b"foo".to_vec();
531 metadata.put(key2, foo.clone());
532
533 metadata.sync().await.unwrap();
535 drop(metadata);
536
537 let (blob, _) = context.open("test", b"left").await.unwrap();
539 blob.resize(5).await.unwrap();
540 blob.sync().await.unwrap();
541
542 let cfg = Config {
544 partition: "test".into(),
545 codec_config: ((0..).into(), ()),
546 };
547 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
548 .await
549 .unwrap();
550
551 let value = metadata.get(&key).unwrap();
553 assert_eq!(value, &hello);
554
555 metadata.destroy().await.unwrap();
556 });
557 }
558
559 #[test_traced]
560 fn test_unclean_shutdown() {
561 let executor = deterministic::Runner::default();
563 executor.start(|context| async move {
564 let key = U64::new(42);
565 let hello = b"hello".to_vec();
566 {
567 let cfg = Config {
569 partition: "test".into(),
570 codec_config: ((0..).into(), ()),
571 };
572 let mut metadata = Metadata::init(context.child("first"), cfg).await.unwrap();
573
574 metadata.put(key.clone(), hello.clone());
576
577 }
579
580 let cfg = Config {
582 partition: "test".into(),
583 codec_config: ((0..).into(), ()),
584 };
585 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
586 .await
587 .unwrap();
588
589 let value = metadata.get(&key);
591 assert!(value.is_none());
592
593 let buffer = context.encode();
595 assert!(buffer.contains("second_sync_rewrites_total 0"));
596 assert!(buffer.contains("second_sync_overwrites_total 0"));
597 assert!(buffer.contains("second_keys 0"));
598
599 metadata.destroy().await.unwrap();
600 });
601 }
602
603 #[test_traced]
604 #[should_panic(expected = "usize value is larger than u32")]
605 fn test_value_too_big_error() {
606 let executor = deterministic::Runner::default();
608 executor.start(|context| async move {
609 let cfg = Config {
611 partition: "test".into(),
612 codec_config: ((0..).into(), ()),
613 };
614 let mut metadata = Metadata::init(context.child("storage"), cfg).await.unwrap();
615
616 let value = vec![0u8; (u32::MAX as usize) + 1];
618 metadata.put(U64::new(1), value);
619
620 metadata.sync().await.unwrap();
622 });
623 }
624
625 #[test_traced]
626 fn test_delta_writes() {
627 let executor = deterministic::Runner::default();
629 executor.start(|context| async move {
630 let cfg = Config {
632 partition: "test".into(),
633 codec_config: ((0..).into(), ()),
634 };
635 let mut metadata = Metadata::init(context.child("storage"), cfg).await.unwrap();
636
637 for i in 0..100 {
639 metadata.put(U64::new(i), vec![i as u8; 100]);
640 }
641
642 metadata.sync().await.unwrap();
646 let buffer = context.encode();
647 assert!(buffer.contains("sync_rewrites_total 1"), "{buffer}");
648 assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
649 assert!(
650 buffer.contains("runtime_storage_write_bytes_total 10912"),
651 "{buffer}",
652 );
653
654 metadata.put(U64::new(51), vec![0xff; 100]);
656
657 metadata.sync().await.unwrap();
659 let buffer = context.encode();
660 assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
661 assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
662 assert!(
663 buffer.contains("runtime_storage_write_bytes_total 21824"),
664 "{buffer}",
665 );
666
667 metadata.sync().await.unwrap();
671 let buffer = context.encode();
672 assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
673 assert!(buffer.contains("sync_overwrites_total 1"), "{buffer}");
674 assert!(
675 buffer.contains("runtime_storage_write_bytes_total 21937"),
676 "{buffer}",
677 );
678
679 metadata.sync().await.unwrap();
683 let buffer = context.encode();
684 assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
685 assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
686 assert!(
687 buffer.contains("runtime_storage_write_bytes_total 21949"),
688 "{buffer}",
689 );
690
691 metadata.remove(&U64::new(51));
695 metadata.sync().await.unwrap();
696 let buffer = context.encode();
697 assert!(buffer.contains("sync_rewrites_total 3"), "{buffer}");
698 assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
699 assert!(
700 buffer.contains("runtime_storage_write_bytes_total 32752"),
701 "{buffer}"
702 );
703
704 metadata.sync().await.unwrap();
706 let buffer = context.encode();
707 assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
708 assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
709 assert!(
710 buffer.contains("runtime_storage_write_bytes_total 43555"),
711 "{buffer}"
712 );
713
714 metadata.put(U64::new(50), vec![0xff; 100]);
718 metadata.sync().await.unwrap();
719 let buffer = context.encode();
720 assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
721 assert!(buffer.contains("sync_overwrites_total 3"), "{buffer}");
722 assert!(
723 buffer.contains("runtime_storage_write_bytes_total 43668"),
724 "{buffer}"
725 );
726
727 metadata.destroy().await.unwrap();
729 });
730 }
731
732 #[test_traced]
733 fn test_sync_with_no_changes() {
734 let executor = deterministic::Runner::default();
735 executor.start(|context| async move {
736 let cfg = Config {
737 partition: "test".into(),
738 codec_config: ((0..).into(), ()),
739 };
740 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("storage"), cfg)
741 .await
742 .unwrap();
743
744 metadata
746 .put_sync(U64::new(1), b"hello".to_vec())
747 .await
748 .unwrap();
749
750 metadata.sync().await.unwrap();
753 let buffer = context.encode();
754 assert!(buffer.contains("sync_rewrites_total 2"));
755 assert!(buffer.contains("sync_overwrites_total 0"));
756
757 metadata.sync().await.unwrap();
759 let buffer = context.encode();
760 assert!(buffer.contains("sync_rewrites_total 2"));
761 assert!(buffer.contains("sync_overwrites_total 1"));
762
763 metadata.sync().await.unwrap();
765 let buffer = context.encode();
766 assert!(buffer.contains("sync_rewrites_total 2"));
767 assert!(buffer.contains("sync_overwrites_total 2"));
768
769 metadata.destroy().await.unwrap();
770 });
771 }
772
773 #[test_traced]
774 fn test_get_mut_marks_modified() {
775 let executor = deterministic::Runner::default();
776 executor.start(|context| async move {
777 let cfg = Config {
778 partition: "test".into(),
779 codec_config: ((0..).into(), ()),
780 };
781 let mut metadata =
782 Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg.clone())
783 .await
784 .unwrap();
785
786 metadata
788 .put_sync(U64::new(1), b"hello".to_vec())
789 .await
790 .unwrap();
791
792 metadata.sync().await.unwrap();
794
795 let value = metadata.get_mut(&U64::new(1)).unwrap();
797 value[0] = b'H';
798
799 metadata.sync().await.unwrap();
801 let buffer = context.encode();
802 assert!(buffer.contains("first_sync_rewrites_total 2"));
803 assert!(buffer.contains("first_sync_overwrites_total 1"));
804
805 drop(metadata);
807 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
808 .await
809 .unwrap();
810
811 let value = metadata.get(&U64::new(1)).unwrap();
813 assert_eq!(value[0], b'H');
814
815 metadata.destroy().await.unwrap();
816 });
817 }
818
819 #[test_traced]
820 fn test_mixed_operation_sequences() {
821 let executor = deterministic::Runner::default();
822 executor.start(|context| async move {
823 let cfg = Config {
824 partition: "test".into(),
825 codec_config: ((0..).into(), ()),
826 };
827 let mut metadata =
828 Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg.clone())
829 .await
830 .unwrap();
831
832 let key = U64::new(1);
833
834 metadata.put(key.clone(), b"first".to_vec());
836 metadata.remove(&key);
837 metadata
838 .put_sync(key.clone(), b"second".to_vec())
839 .await
840 .unwrap();
841 let value = metadata.get(&key).unwrap();
842 assert_eq!(value, b"second");
843
844 metadata.put(key.clone(), b"third".to_vec());
846 let value = metadata.get_mut(&key).unwrap();
847 value[0] = b'T';
848 metadata.remove(&key);
849 metadata
850 .put_sync(key.clone(), b"fourth".to_vec())
851 .await
852 .unwrap();
853 let value = metadata.get(&key).unwrap();
854 assert_eq!(value, b"fourth");
855
856 drop(metadata);
858 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
859 .await
860 .unwrap();
861
862 let value = metadata.get(&key).unwrap();
864 assert_eq!(value, b"fourth");
865
866 metadata.destroy().await.unwrap();
867 });
868 }
869
870 #[test_traced]
871 fn test_overwrite_vs_rewrite() {
872 let executor = deterministic::Runner::default();
873 executor.start(|context| async move {
874 let cfg = Config {
875 partition: "test".into(),
876 codec_config: ((0..).into(), ()),
877 };
878 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("storage"), cfg)
879 .await
880 .unwrap();
881
882 metadata.put(U64::new(1), vec![1; 10]);
884 metadata.put(U64::new(2), vec![2; 10]);
885 metadata.sync().await.unwrap();
886
887 metadata.put(U64::new(1), vec![0xFF; 10]);
889 metadata.sync().await.unwrap();
890 let buffer = context.encode();
891 assert!(buffer.contains("sync_rewrites_total 2"));
892 assert!(buffer.contains("sync_overwrites_total 0"));
893
894 metadata.sync().await.unwrap();
896 let buffer = context.encode();
897 assert!(buffer.contains("sync_rewrites_total 2"));
898 assert!(buffer.contains("sync_overwrites_total 1"));
899
900 metadata.put(U64::new(1), vec![0xAA; 10]);
902 metadata.sync().await.unwrap();
903 let buffer = context.encode();
904 assert!(buffer.contains("sync_rewrites_total 2"));
905 assert!(buffer.contains("sync_overwrites_total 2"));
906
907 metadata.put(U64::new(1), vec![0xFF; 20]);
909 metadata.sync().await.unwrap();
910 let buffer = context.encode();
911 assert!(buffer.contains("sync_rewrites_total 3"));
912 assert!(buffer.contains("sync_overwrites_total 2"));
913
914 metadata.put(U64::new(3), vec![3; 10]);
916 metadata.sync().await.unwrap();
917 let buffer = context.encode();
918 assert!(buffer.contains("sync_rewrites_total 4"));
919 assert!(buffer.contains("sync_overwrites_total 2"));
920
921 metadata.sync().await.unwrap();
923 let buffer = context.encode();
924 assert!(buffer.contains("sync_rewrites_total 5"));
925 assert!(buffer.contains("sync_overwrites_total 2"));
926
927 metadata.put(U64::new(2), vec![0xAA; 10]);
929 metadata.sync().await.unwrap();
930 let buffer = context.encode();
931 assert!(buffer.contains("sync_rewrites_total 5"));
932 assert!(buffer.contains("sync_overwrites_total 3"));
933
934 metadata.destroy().await.unwrap();
935 });
936 }
937
938 #[test_traced]
939 fn test_blob_resize() {
940 let executor = deterministic::Runner::default();
941 executor.start(|context| async move {
942 let cfg = Config {
943 partition: "test".into(),
944 codec_config: ((0..).into(), ()),
945 };
946 let mut metadata =
947 Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg.clone())
948 .await
949 .unwrap();
950
951 for i in 0..10 {
953 metadata.put(U64::new(i), vec![i as u8; 100]);
954 }
955 metadata.sync().await.unwrap();
956
957 metadata.sync().await.unwrap();
959 let buffer = context.encode();
960 assert!(buffer.contains("first_sync_rewrites_total 2"));
961 assert!(buffer.contains("first_sync_overwrites_total 0"));
962
963 for i in 1..10 {
965 metadata.remove(&U64::new(i));
966 }
967 metadata.sync().await.unwrap();
968
969 let value = metadata.get(&U64::new(0)).unwrap();
971 assert_eq!(value.len(), 100);
972 assert_eq!(value[0], 0);
973
974 let buffer = context.encode();
976 assert!(buffer.contains("first_sync_rewrites_total 3"));
977 assert!(buffer.contains("first_sync_overwrites_total 0"));
978
979 drop(metadata);
981 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
982 .await
983 .unwrap();
984
985 let value = metadata.get(&U64::new(0)).unwrap();
987 assert_eq!(value.len(), 100);
988 assert_eq!(value[0], 0);
989
990 for i in 1..10 {
992 assert!(metadata.get(&U64::new(i)).is_none());
993 }
994
995 metadata.destroy().await.unwrap();
996 });
997 }
998
999 #[test_traced]
1000 fn test_clear_and_repopulate() {
1001 let executor = deterministic::Runner::default();
1002 executor.start(|context| async move {
1003 let cfg = Config {
1004 partition: "test".into(),
1005 codec_config: ((0..).into(), ()),
1006 };
1007 let mut metadata =
1008 Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg.clone())
1009 .await
1010 .unwrap();
1011
1012 metadata.put(U64::new(1), b"first".to_vec());
1014 metadata
1015 .put_sync(U64::new(2), b"second".to_vec())
1016 .await
1017 .unwrap();
1018
1019 metadata.clear();
1021 metadata.sync().await.unwrap();
1022
1023 assert!(metadata.get(&U64::new(1)).is_none());
1025 assert!(metadata.get(&U64::new(2)).is_none());
1026
1027 drop(metadata);
1029 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
1030 .await
1031 .unwrap();
1032
1033 assert!(metadata.get(&U64::new(1)).is_none());
1035 assert!(metadata.get(&U64::new(2)).is_none());
1036
1037 metadata.put(U64::new(3), b"third".to_vec());
1039 metadata
1040 .put_sync(U64::new(4), b"fourth".to_vec())
1041 .await
1042 .unwrap();
1043
1044 assert_eq!(metadata.get(&U64::new(3)).unwrap(), b"third");
1046 assert_eq!(metadata.get(&U64::new(4)).unwrap(), b"fourth");
1047 assert!(metadata.get(&U64::new(1)).is_none());
1048 assert!(metadata.get(&U64::new(2)).is_none());
1049
1050 metadata.destroy().await.unwrap();
1051 });
1052 }
1053
1054 fn test_metadata_operations_and_restart(num_operations: usize) -> String {
1055 let executor = deterministic::Runner::default();
1056 executor.start(|mut context| async move {
1057 let cfg = Config {
1058 partition: "test-determinism".into(),
1059 codec_config: ((0..).into(), ()),
1060 };
1061 let mut metadata =
1062 Metadata::<_, U64, Vec<u8>>::init(context.child("storage"), cfg.clone())
1063 .await
1064 .unwrap();
1065
1066 for i in 0..num_operations {
1068 let key = U64::new(i as u64);
1069 let mut value = vec![0u8; 64];
1070 context.fill_bytes(&mut value);
1071 metadata.put(key, value);
1072
1073 if context.gen_bool(0.1) {
1075 metadata.sync().await.unwrap();
1076 }
1077
1078 if context.gen_bool(0.1) {
1080 let selected_index = context.gen_range(0..=i);
1081 let update_key = U64::new(selected_index as u64);
1082 let mut new_value = vec![0u8; 64];
1083 context.fill_bytes(&mut new_value);
1084 metadata.put(update_key, new_value);
1085 }
1086
1087 if context.gen_bool(0.1) {
1089 let selected_index = context.gen_range(0..=i);
1090 let remove_key = U64::new(selected_index as u64);
1091 metadata.remove(&remove_key);
1092 }
1093
1094 if context.gen_bool(0.1) {
1096 let selected_index = context.gen_range(0..=i);
1097 let mut_key = U64::new(selected_index as u64);
1098 if let Some(value) = metadata.get_mut(&mut_key) {
1099 if !value.is_empty() {
1100 value[0] = value[0].wrapping_add(1);
1101 }
1102 }
1103 }
1104 }
1105 metadata.sync().await.unwrap();
1106
1107 metadata.destroy().await.unwrap();
1109
1110 context.auditor().state()
1111 })
1112 }
1113
1114 #[test_group("slow")]
1115 #[test_traced]
1116 fn test_determinism() {
1117 let state1 = test_metadata_operations_and_restart(1_000);
1118 let state2 = test_metadata_operations_and_restart(1_000);
1119 assert_eq!(state1, state2);
1120 }
1121
1122 #[test_traced]
1123 fn test_keys_iterator() {
1124 let executor = deterministic::Runner::default();
1126 executor.start(|context| async move {
1127 let cfg = Config {
1129 partition: "test".into(),
1130 codec_config: ((0..).into(), ()),
1131 };
1132 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("storage"), cfg)
1133 .await
1134 .unwrap();
1135
1136 metadata.put(U64::new(0x1000), b"value1".to_vec());
1138 metadata.put(U64::new(0x1001), b"value2".to_vec());
1139 metadata.put(U64::new(0x1002), b"value3".to_vec());
1140 metadata.put(U64::new(0x2000), b"value4".to_vec());
1141 metadata.put(U64::new(0x2001), b"value5".to_vec());
1142 metadata.put(U64::new(0x3000), b"value6".to_vec());
1143
1144 let all_keys: Vec<_> = metadata.keys().cloned().collect();
1146 assert_eq!(all_keys.len(), 6);
1147 assert!(all_keys.contains(&U64::new(0x1000)));
1148 assert!(all_keys.contains(&U64::new(0x3000)));
1149
1150 let prefix = hex!("0x00000000000010");
1152 let prefix_keys: Vec<_> = metadata
1153 .keys()
1154 .filter(|k| k.as_ref().starts_with(&prefix))
1155 .cloned()
1156 .collect();
1157 assert_eq!(prefix_keys.len(), 3);
1158 assert!(prefix_keys.contains(&U64::new(0x1000)));
1159 assert!(prefix_keys.contains(&U64::new(0x1001)));
1160 assert!(prefix_keys.contains(&U64::new(0x1002)));
1161 assert!(!prefix_keys.contains(&U64::new(0x2000)));
1162
1163 let prefix = hex!("0x00000000000020");
1165 let prefix_keys: Vec<_> = metadata
1166 .keys()
1167 .filter(|k| k.as_ref().starts_with(&prefix))
1168 .cloned()
1169 .collect();
1170 assert_eq!(prefix_keys.len(), 2);
1171 assert!(prefix_keys.contains(&U64::new(0x2000)));
1172 assert!(prefix_keys.contains(&U64::new(0x2001)));
1173
1174 let prefix = hex!("0x00000000000040");
1176 let prefix_keys: Vec<_> = metadata
1177 .keys()
1178 .filter(|k| k.as_ref().starts_with(&prefix))
1179 .cloned()
1180 .collect();
1181 assert_eq!(prefix_keys.len(), 0);
1182
1183 metadata.destroy().await.unwrap();
1184 });
1185 }
1186
1187 #[test_traced]
1188 fn test_retain() {
1189 let executor = deterministic::Runner::default();
1191 executor.start(|context| async move {
1192 let cfg = Config {
1194 partition: "test".into(),
1195 codec_config: ((0..).into(), ()),
1196 };
1197 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("first"), cfg)
1198 .await
1199 .unwrap();
1200
1201 metadata.put(U64::new(0x1000), b"value1".to_vec());
1203 metadata.put(U64::new(0x1001), b"value2".to_vec());
1204 metadata.put(U64::new(0x1002), b"value3".to_vec());
1205 metadata.put(U64::new(0x2000), b"value4".to_vec());
1206 metadata.put(U64::new(0x2001), b"value5".to_vec());
1207 metadata.put(U64::new(0x3000), b"value6".to_vec());
1208
1209 let buffer = context.encode();
1211 assert!(buffer.contains("first_keys 6"));
1212
1213 let prefix = hex!("0x00000000000010");
1215 metadata.retain(|k, _| !k.as_ref().starts_with(&prefix));
1216
1217 let buffer = context.encode();
1219 assert!(buffer.contains("first_keys 3"));
1220
1221 assert!(metadata.get(&U64::new(0x1000)).is_none());
1223 assert!(metadata.get(&U64::new(0x1001)).is_none());
1224 assert!(metadata.get(&U64::new(0x1002)).is_none());
1225 assert!(metadata.get(&U64::new(0x2000)).is_some());
1226 assert!(metadata.get(&U64::new(0x2001)).is_some());
1227 assert!(metadata.get(&U64::new(0x3000)).is_some());
1228
1229 metadata.sync().await.unwrap();
1231 drop(metadata);
1232 let cfg = Config {
1233 partition: "test".into(),
1234 codec_config: ((0..).into(), ()),
1235 };
1236 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.child("second"), cfg)
1237 .await
1238 .unwrap();
1239
1240 assert!(metadata.get(&U64::new(0x1000)).is_none());
1242 assert!(metadata.get(&U64::new(0x2000)).is_some());
1243 assert_eq!(metadata.keys().count(), 3);
1244
1245 let prefix = hex!("0x00000000000040");
1247 metadata.retain(|k, _| !k.as_ref().starts_with(&prefix));
1248
1249 metadata.retain(|_, _| false);
1251 assert_eq!(metadata.keys().count(), 0);
1252
1253 metadata.destroy().await.unwrap();
1254 });
1255 }
1256}