1mod storage;
67pub use storage::Metadata;
68use thiserror::Error;
69
70#[derive(Debug, Error)]
72pub enum Error {
73 #[error("runtime error: {0}")]
74 Runtime(#[from] commonware_runtime::Error),
75}
76
77#[derive(Clone)]
79pub struct Config<C> {
80 pub partition: String,
82
83 pub codec_config: C,
85}
86
87#[cfg(test)]
88mod tests {
89 use super::*;
90 use commonware_macros::{test_group, test_traced};
91 use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
92 use commonware_utils::{hex, sequence::U64};
93 use rand::{Rng, RngCore};
94
95 #[test_traced]
96 fn test_put_get_clear() {
97 let executor = deterministic::Runner::default();
99 executor.start(|context| async move {
100 let cfg = Config {
102 partition: "test".to_string(),
103 codec_config: ((0..).into(), ()),
104 };
105 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
106 .await
107 .unwrap();
108
109 let key = U64::new(42);
111 let value = metadata.get(&key);
112 assert!(value.is_none());
113
114 let buffer = context.encode();
116 assert!(buffer.contains("first_sync_rewrites_total 0"));
117 assert!(buffer.contains("first_sync_overwrites_total 0"));
118 assert!(buffer.contains("first_keys 0"));
119
120 let hello = b"hello".to_vec();
122 metadata.put(key.clone(), hello.clone());
123
124 let value = metadata.get(&key).unwrap();
126 assert_eq!(value, &hello);
127
128 let buffer = context.encode();
130 assert!(buffer.contains("first_sync_rewrites_total 0"));
131 assert!(buffer.contains("first_sync_overwrites_total 0"));
132 assert!(buffer.contains("first_keys 1"));
133
134 metadata.sync().await.unwrap();
136
137 let buffer = context.encode();
139 assert!(buffer.contains("first_sync_rewrites_total 1"));
140 assert!(buffer.contains("first_sync_overwrites_total 0"));
141 assert!(buffer.contains("first_keys 1"));
142
143 drop(metadata);
145 let cfg = Config {
146 partition: "test".to_string(),
147 codec_config: ((0..).into(), ()),
148 };
149 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
150 .await
151 .unwrap();
152
153 let buffer = context.encode();
155 assert!(buffer.contains("second_sync_rewrites_total 0"));
156 assert!(buffer.contains("second_sync_overwrites_total 0"));
157 assert!(buffer.contains("second_keys 1"));
158
159 let value = metadata.get(&key).unwrap();
161 assert_eq!(value, &hello);
162
163 metadata.clear();
165 let value = metadata.get(&key);
166 assert!(value.is_none());
167
168 let buffer = context.encode();
170 assert!(buffer.contains("second_sync_rewrites_total 0"));
171 assert!(buffer.contains("second_sync_overwrites_total 0"));
172 assert!(buffer.contains("second_keys 0"));
173
174 metadata.destroy().await.unwrap();
175 });
176 }
177
178 #[test_traced]
179 fn test_put_returns_previous_value() {
180 let executor = deterministic::Runner::default();
181 executor.start(|context| async move {
182 let cfg = Config {
183 partition: "test".to_string(),
184 codec_config: ((0..).into(), ()),
185 };
186 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
187 .await
188 .unwrap();
189
190 let key = U64::new(42);
191
192 let previous = metadata.put(key.clone(), b"first".to_vec());
194 assert!(previous.is_none());
195
196 let previous = metadata.put(key.clone(), b"second".to_vec());
198 assert_eq!(previous, Some(b"first".to_vec()));
199
200 let previous = metadata.put(key.clone(), b"third".to_vec());
202 assert_eq!(previous, Some(b"second".to_vec()));
203
204 assert_eq!(metadata.get(&key), Some(&b"third".to_vec()));
206
207 let other_key = U64::new(99);
209 let previous = metadata.put(other_key.clone(), b"other".to_vec());
210 assert!(previous.is_none());
211
212 metadata.sync().await.unwrap();
214 drop(metadata);
215
216 let cfg = Config {
217 partition: "test".to_string(),
218 codec_config: ((0..).into(), ()),
219 };
220 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
221 .await
222 .unwrap();
223
224 let previous = metadata.put(key.clone(), b"fourth".to_vec());
226 assert_eq!(previous, Some(b"third".to_vec()));
227
228 metadata.destroy().await.unwrap();
229 });
230 }
231
232 #[test_traced]
233 fn test_multi_sync() {
234 let executor = deterministic::Runner::default();
236 executor.start(|context| async move {
237 let cfg = Config {
239 partition: "test".to_string(),
240 codec_config: ((0..).into(), ()),
241 };
242 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
243 .await
244 .unwrap();
245
246 let key = U64::new(42);
248 let hello = b"hello".to_vec();
249 metadata.put(key.clone(), hello.clone());
250
251 metadata.sync().await.unwrap();
253
254 let buffer = context.encode();
256 assert!(buffer.contains("first_sync_rewrites_total 1"));
257 assert!(buffer.contains("first_sync_overwrites_total 0"));
258 assert!(buffer.contains("first_keys 1"));
259
260 let world = b"world".to_vec();
262 metadata.put(key.clone(), world.clone());
263 let key2 = U64::new(43);
264 let foo = b"foo".to_vec();
265 metadata.put(key2.clone(), foo.clone());
266
267 metadata.sync().await.unwrap();
269
270 let buffer = context.encode();
272 assert!(buffer.contains("first_sync_rewrites_total 2"));
273 assert!(buffer.contains("first_sync_overwrites_total 0"));
274 assert!(buffer.contains("first_keys 2"));
275
276 drop(metadata);
278 let cfg = Config {
279 partition: "test".to_string(),
280 codec_config: ((0..).into(), ()),
281 };
282 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
283 .await
284 .unwrap();
285
286 let buffer = context.encode();
288 assert!(buffer.contains("second_sync_rewrites_total 0"));
289 assert!(buffer.contains("second_sync_overwrites_total 0"));
290 assert!(buffer.contains("second_keys 2"));
291
292 let value = metadata.get(&key).unwrap();
294 assert_eq!(value, &world);
295 let value = metadata.get(&key2).unwrap();
296 assert_eq!(value, &foo);
297
298 metadata.remove(&key);
300
301 metadata.sync().await.unwrap();
303
304 let buffer = context.encode();
306 assert!(buffer.contains("second_sync_rewrites_total 1"));
307 assert!(buffer.contains("second_sync_overwrites_total 0"));
308 assert!(buffer.contains("second_keys 1"));
309
310 drop(metadata);
312 let cfg = Config {
313 partition: "test".to_string(),
314 codec_config: ((0..).into(), ()),
315 };
316 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("third"), cfg)
317 .await
318 .unwrap();
319
320 let buffer = context.encode();
322 assert!(buffer.contains("third_sync_rewrites_total 0"));
323 assert!(buffer.contains("third_sync_overwrites_total 0"));
324 assert!(buffer.contains("third_keys 1"));
325
326 let value = metadata.get(&key);
328 assert!(value.is_none());
329 let value = metadata.get(&key2).unwrap();
330 assert_eq!(value, &foo);
331
332 metadata.destroy().await.unwrap();
333 });
334 }
335
336 #[test_traced]
337 fn test_recover_corrupted_one() {
338 let executor = deterministic::Runner::default();
340 executor.start(|context| async move {
341 let cfg = Config {
343 partition: "test".to_string(),
344 codec_config: ((0..).into(), ()),
345 };
346 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
347 .await
348 .unwrap();
349
350 let key = U64::new(42);
352 let hello = b"hello".to_vec();
353 metadata.put(key.clone(), hello.clone());
354
355 metadata.sync().await.unwrap();
357
358 let world = b"world".to_vec();
360 metadata.put(key.clone(), world.clone());
361 let key2 = U64::new(43);
362 let foo = b"foo".to_vec();
363 metadata.put(key2, foo.clone());
364
365 metadata.sync().await.unwrap();
367 drop(metadata);
368
369 let (blob, _) = context.open("test", b"left").await.unwrap();
371 blob.write_at(0, b"corrupted".to_vec()).await.unwrap();
372 blob.sync().await.unwrap();
373
374 let cfg = Config {
376 partition: "test".to_string(),
377 codec_config: ((0..).into(), ()),
378 };
379 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
380 .await
381 .unwrap();
382
383 let value = metadata.get(&key).unwrap();
385 assert_eq!(value, &hello);
386
387 metadata.destroy().await.unwrap();
388 });
389 }
390
391 #[test_traced]
392 fn test_recover_corrupted_both() {
393 let executor = deterministic::Runner::default();
395 executor.start(|context| async move {
396 let cfg = Config {
398 partition: "test".to_string(),
399 codec_config: ((0..).into(), ()),
400 };
401 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
402 .await
403 .unwrap();
404
405 let key = U64::new(42);
407 let hello = b"hello".to_vec();
408 metadata.put(key.clone(), hello.clone());
409
410 metadata.sync().await.unwrap();
412
413 let world = b"world".to_vec();
415 metadata.put(key.clone(), world.clone());
416 let key2 = U64::new(43);
417 let foo = b"foo".to_vec();
418 metadata.put(key2, foo.clone());
419
420 metadata.sync().await.unwrap();
422 drop(metadata);
423
424 let (blob, _) = context.open("test", b"left").await.unwrap();
426 blob.write_at(0, b"corrupted".to_vec()).await.unwrap();
427 blob.sync().await.unwrap();
428 let (blob, _) = context.open("test", b"right").await.unwrap();
429 blob.write_at(0, b"corrupted".to_vec()).await.unwrap();
430 blob.sync().await.unwrap();
431
432 let cfg = Config {
434 partition: "test".to_string(),
435 codec_config: ((0..).into(), ()),
436 };
437 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
438 .await
439 .unwrap();
440
441 let value = metadata.get(&key);
443 assert!(value.is_none());
444
445 let buffer = context.encode();
447 assert!(buffer.contains("second_sync_rewrites_total 0"));
448 assert!(buffer.contains("second_sync_overwrites_total 0"));
449 assert!(buffer.contains("second_keys 0"));
450
451 metadata.destroy().await.unwrap();
452 });
453 }
454
455 #[test_traced]
456 fn test_recover_corrupted_truncate() {
457 let executor = deterministic::Runner::default();
459 executor.start(|context| async move {
460 let cfg = Config {
462 partition: "test".to_string(),
463 codec_config: ((0..).into(), ()),
464 };
465 let mut metadata = Metadata::init(context.with_label("first"), cfg)
466 .await
467 .unwrap();
468
469 let key = U64::new(42);
471 let hello = b"hello".to_vec();
472 metadata.put(key.clone(), hello.clone());
473
474 metadata.sync().await.unwrap();
476
477 let world = b"world".to_vec();
479 metadata.put(key.clone(), world.clone());
480 let key2 = U64::new(43);
481 let foo = b"foo".to_vec();
482 metadata.put(key2, foo.clone());
483
484 metadata.sync().await.unwrap();
486 drop(metadata);
487
488 let (blob, len) = context.open("test", b"left").await.unwrap();
490 blob.resize(len - 8).await.unwrap();
491 blob.sync().await.unwrap();
492
493 let cfg = Config {
495 partition: "test".to_string(),
496 codec_config: ((0..).into(), ()),
497 };
498 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
499 .await
500 .unwrap();
501
502 let value = metadata.get(&key).unwrap();
504 assert_eq!(value, &hello);
505
506 metadata.destroy().await.unwrap();
507 });
508 }
509
510 #[test_traced]
511 fn test_recover_corrupted_short() {
512 let executor = deterministic::Runner::default();
514 executor.start(|context| async move {
515 let cfg = Config {
517 partition: "test".to_string(),
518 codec_config: ((0..).into(), ()),
519 };
520 let mut metadata = Metadata::init(context.with_label("first"), cfg)
521 .await
522 .unwrap();
523
524 let key = U64::new(42);
526 let hello = b"hello".to_vec();
527 metadata.put(key.clone(), hello.clone());
528
529 metadata.sync().await.unwrap();
531
532 let world = b"world".to_vec();
534 metadata.put(key.clone(), world.clone());
535 let key2 = U64::new(43);
536 let foo = b"foo".to_vec();
537 metadata.put(key2, foo.clone());
538
539 metadata.sync().await.unwrap();
541 drop(metadata);
542
543 let (blob, _) = context.open("test", b"left").await.unwrap();
545 blob.resize(5).await.unwrap();
546 blob.sync().await.unwrap();
547
548 let cfg = Config {
550 partition: "test".to_string(),
551 codec_config: ((0..).into(), ()),
552 };
553 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
554 .await
555 .unwrap();
556
557 let value = metadata.get(&key).unwrap();
559 assert_eq!(value, &hello);
560
561 metadata.destroy().await.unwrap();
562 });
563 }
564
565 #[test_traced]
566 fn test_unclean_shutdown() {
567 let executor = deterministic::Runner::default();
569 executor.start(|context| async move {
570 let key = U64::new(42);
571 let hello = b"hello".to_vec();
572 {
573 let cfg = Config {
575 partition: "test".to_string(),
576 codec_config: ((0..).into(), ()),
577 };
578 let mut metadata = Metadata::init(context.with_label("first"), cfg)
579 .await
580 .unwrap();
581
582 metadata.put(key.clone(), hello.clone());
584
585 }
587
588 let cfg = Config {
590 partition: "test".to_string(),
591 codec_config: ((0..).into(), ()),
592 };
593 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
594 .await
595 .unwrap();
596
597 let value = metadata.get(&key);
599 assert!(value.is_none());
600
601 let buffer = context.encode();
603 assert!(buffer.contains("second_sync_rewrites_total 0"));
604 assert!(buffer.contains("second_sync_overwrites_total 0"));
605 assert!(buffer.contains("second_keys 0"));
606
607 metadata.destroy().await.unwrap();
608 });
609 }
610
611 #[test_traced]
612 #[should_panic(expected = "usize value is larger than u32")]
613 fn test_value_too_big_error() {
614 let executor = deterministic::Runner::default();
616 executor.start(|context| async move {
617 let cfg = Config {
619 partition: "test".to_string(),
620 codec_config: ((0..).into(), ()),
621 };
622 let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
623
624 let value = vec![0u8; (u32::MAX as usize) + 1];
626 metadata.put(U64::new(1), value);
627
628 metadata.sync().await.unwrap();
630 });
631 }
632
633 #[test_traced]
634 fn test_delta_writes() {
635 let executor = deterministic::Runner::default();
637 executor.start(|context| async move {
638 let cfg = Config {
640 partition: "test".to_string(),
641 codec_config: ((0..).into(), ()),
642 };
643 let mut metadata = Metadata::init(context.clone(), cfg).await.unwrap();
644
645 for i in 0..100 {
647 metadata.put(U64::new(i), vec![i as u8; 100]);
648 }
649
650 metadata.sync().await.unwrap();
654 let buffer = context.encode();
655 assert!(buffer.contains("sync_rewrites_total 1"), "{buffer}");
656 assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
657 assert!(
658 buffer.contains("runtime_storage_write_bytes_total 10912"),
659 "{buffer}",
660 );
661
662 metadata.put(U64::new(51), vec![0xff; 100]);
664
665 metadata.sync().await.unwrap();
667 let buffer = context.encode();
668 assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
669 assert!(buffer.contains("sync_overwrites_total 0"), "{buffer}");
670 assert!(
671 buffer.contains("runtime_storage_write_bytes_total 21824"),
672 "{buffer}",
673 );
674
675 metadata.sync().await.unwrap();
679 let buffer = context.encode();
680 assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
681 assert!(buffer.contains("sync_overwrites_total 1"), "{buffer}");
682 assert!(
683 buffer.contains("runtime_storage_write_bytes_total 21937"),
684 "{buffer}",
685 );
686
687 metadata.sync().await.unwrap();
691 let buffer = context.encode();
692 assert!(buffer.contains("sync_rewrites_total 2"), "{buffer}");
693 assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
694 assert!(
695 buffer.contains("runtime_storage_write_bytes_total 21949"),
696 "{buffer}",
697 );
698
699 metadata.remove(&U64::new(51));
703 metadata.sync().await.unwrap();
704 let buffer = context.encode();
705 assert!(buffer.contains("sync_rewrites_total 3"), "{buffer}");
706 assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
707 assert!(
708 buffer.contains("runtime_storage_write_bytes_total 32752"),
709 "{buffer}"
710 );
711
712 metadata.sync().await.unwrap();
714 let buffer = context.encode();
715 assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
716 assert!(buffer.contains("sync_overwrites_total 2"), "{buffer}");
717 assert!(
718 buffer.contains("runtime_storage_write_bytes_total 43555"),
719 "{buffer}"
720 );
721
722 metadata.put(U64::new(50), vec![0xff; 100]);
726 metadata.sync().await.unwrap();
727 let buffer = context.encode();
728 assert!(buffer.contains("sync_rewrites_total 4"), "{buffer}");
729 assert!(buffer.contains("sync_overwrites_total 3"), "{buffer}");
730 assert!(
731 buffer.contains("runtime_storage_write_bytes_total 43668"),
732 "{buffer}"
733 );
734
735 metadata.destroy().await.unwrap();
737 });
738 }
739
740 #[test_traced]
741 fn test_sync_with_no_changes() {
742 let executor = deterministic::Runner::default();
743 executor.start(|context| async move {
744 let cfg = Config {
745 partition: "test".to_string(),
746 codec_config: ((0..).into(), ()),
747 };
748 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
749 .await
750 .unwrap();
751
752 metadata.put(U64::new(1), b"hello".to_vec());
754 metadata.sync().await.unwrap();
755
756 metadata.sync().await.unwrap();
759 let buffer = context.encode();
760 assert!(buffer.contains("sync_rewrites_total 2"));
761 assert!(buffer.contains("sync_overwrites_total 0"));
762
763 metadata.sync().await.unwrap();
765 let buffer = context.encode();
766 assert!(buffer.contains("sync_rewrites_total 2"));
767 assert!(buffer.contains("sync_overwrites_total 1"));
768
769 metadata.sync().await.unwrap();
771 let buffer = context.encode();
772 assert!(buffer.contains("sync_rewrites_total 2"));
773 assert!(buffer.contains("sync_overwrites_total 2"));
774
775 metadata.destroy().await.unwrap();
776 });
777 }
778
779 #[test_traced]
780 fn test_get_mut_marks_modified() {
781 let executor = deterministic::Runner::default();
782 executor.start(|context| async move {
783 let cfg = Config {
784 partition: "test".to_string(),
785 codec_config: ((0..).into(), ()),
786 };
787 let mut metadata =
788 Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg.clone())
789 .await
790 .unwrap();
791
792 metadata.put(U64::new(1), b"hello".to_vec());
794 metadata.sync().await.unwrap();
795
796 metadata.sync().await.unwrap();
798
799 let value = metadata.get_mut(&U64::new(1)).unwrap();
801 value[0] = b'H';
802
803 metadata.sync().await.unwrap();
805 let buffer = context.encode();
806 assert!(buffer.contains("first_sync_rewrites_total 2"));
807 assert!(buffer.contains("first_sync_overwrites_total 1"));
808
809 drop(metadata);
811 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
812 .await
813 .unwrap();
814
815 let value = metadata.get(&U64::new(1)).unwrap();
817 assert_eq!(value[0], b'H');
818
819 metadata.destroy().await.unwrap();
820 });
821 }
822
823 #[test_traced]
824 fn test_mixed_operation_sequences() {
825 let executor = deterministic::Runner::default();
826 executor.start(|context| async move {
827 let cfg = Config {
828 partition: "test".to_string(),
829 codec_config: ((0..).into(), ()),
830 };
831 let mut metadata =
832 Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg.clone())
833 .await
834 .unwrap();
835
836 let key = U64::new(1);
837
838 metadata.put(key.clone(), b"first".to_vec());
840 metadata.remove(&key);
841 metadata.put(key.clone(), b"second".to_vec());
842 metadata.sync().await.unwrap();
843 let value = metadata.get(&key).unwrap();
844 assert_eq!(value, b"second");
845
846 metadata.put(key.clone(), b"third".to_vec());
848 let value = metadata.get_mut(&key).unwrap();
849 value[0] = b'T';
850 metadata.remove(&key);
851 metadata.put(key.clone(), b"fourth".to_vec());
852 metadata.sync().await.unwrap();
853 let value = metadata.get(&key).unwrap();
854 assert_eq!(value, b"fourth");
855
856 drop(metadata);
858 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
859 .await
860 .unwrap();
861
862 let value = metadata.get(&key).unwrap();
864 assert_eq!(value, b"fourth");
865
866 metadata.destroy().await.unwrap();
867 });
868 }
869
870 #[test_traced]
871 fn test_overwrite_vs_rewrite() {
872 let executor = deterministic::Runner::default();
873 executor.start(|context| async move {
874 let cfg = Config {
875 partition: "test".to_string(),
876 codec_config: ((0..).into(), ()),
877 };
878 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
879 .await
880 .unwrap();
881
882 metadata.put(U64::new(1), vec![1; 10]);
884 metadata.put(U64::new(2), vec![2; 10]);
885 metadata.sync().await.unwrap();
886
887 metadata.put(U64::new(1), vec![0xFF; 10]);
889 metadata.sync().await.unwrap();
890 let buffer = context.encode();
891 assert!(buffer.contains("sync_rewrites_total 2"));
892 assert!(buffer.contains("sync_overwrites_total 0"));
893
894 metadata.sync().await.unwrap();
896 let buffer = context.encode();
897 assert!(buffer.contains("sync_rewrites_total 2"));
898 assert!(buffer.contains("sync_overwrites_total 1"));
899
900 metadata.put(U64::new(1), vec![0xAA; 10]);
902 metadata.sync().await.unwrap();
903 let buffer = context.encode();
904 assert!(buffer.contains("sync_rewrites_total 2"));
905 assert!(buffer.contains("sync_overwrites_total 2"));
906
907 metadata.put(U64::new(1), vec![0xFF; 20]);
909 metadata.sync().await.unwrap();
910 let buffer = context.encode();
911 assert!(buffer.contains("sync_rewrites_total 3"));
912 assert!(buffer.contains("sync_overwrites_total 2"));
913
914 metadata.put(U64::new(3), vec![3; 10]);
916 metadata.sync().await.unwrap();
917 let buffer = context.encode();
918 assert!(buffer.contains("sync_rewrites_total 4"));
919 assert!(buffer.contains("sync_overwrites_total 2"));
920
921 metadata.sync().await.unwrap();
923 let buffer = context.encode();
924 assert!(buffer.contains("sync_rewrites_total 5"));
925 assert!(buffer.contains("sync_overwrites_total 2"));
926
927 metadata.put(U64::new(2), vec![0xAA; 10]);
929 metadata.sync().await.unwrap();
930 let buffer = context.encode();
931 assert!(buffer.contains("sync_rewrites_total 5"));
932 assert!(buffer.contains("sync_overwrites_total 3"));
933
934 metadata.destroy().await.unwrap();
935 });
936 }
937
938 #[test_traced]
939 fn test_blob_resize() {
940 let executor = deterministic::Runner::default();
941 executor.start(|context| async move {
942 let cfg = Config {
943 partition: "test".to_string(),
944 codec_config: ((0..).into(), ()),
945 };
946 let mut metadata =
947 Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg.clone())
948 .await
949 .unwrap();
950
951 for i in 0..10 {
953 metadata.put(U64::new(i), vec![i as u8; 100]);
954 }
955 metadata.sync().await.unwrap();
956
957 metadata.sync().await.unwrap();
959 let buffer = context.encode();
960 assert!(buffer.contains("first_sync_rewrites_total 2"));
961 assert!(buffer.contains("first_sync_overwrites_total 0"));
962
963 for i in 1..10 {
965 metadata.remove(&U64::new(i));
966 }
967 metadata.sync().await.unwrap();
968
969 let value = metadata.get(&U64::new(0)).unwrap();
971 assert_eq!(value.len(), 100);
972 assert_eq!(value[0], 0);
973
974 let buffer = context.encode();
976 assert!(buffer.contains("first_sync_rewrites_total 3"));
977 assert!(buffer.contains("first_sync_overwrites_total 0"));
978
979 drop(metadata);
981 let metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
982 .await
983 .unwrap();
984
985 let value = metadata.get(&U64::new(0)).unwrap();
987 assert_eq!(value.len(), 100);
988 assert_eq!(value[0], 0);
989
990 for i in 1..10 {
992 assert!(metadata.get(&U64::new(i)).is_none());
993 }
994
995 metadata.destroy().await.unwrap();
996 });
997 }
998
999 #[test_traced]
1000 fn test_clear_and_repopulate() {
1001 let executor = deterministic::Runner::default();
1002 executor.start(|context| async move {
1003 let cfg = Config {
1004 partition: "test".to_string(),
1005 codec_config: ((0..).into(), ()),
1006 };
1007 let mut metadata =
1008 Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg.clone())
1009 .await
1010 .unwrap();
1011
1012 metadata.put(U64::new(1), b"first".to_vec());
1014 metadata.put(U64::new(2), b"second".to_vec());
1015 metadata.sync().await.unwrap();
1016
1017 metadata.clear();
1019 metadata.sync().await.unwrap();
1020
1021 assert!(metadata.get(&U64::new(1)).is_none());
1023 assert!(metadata.get(&U64::new(2)).is_none());
1024
1025 drop(metadata);
1027 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
1028 .await
1029 .unwrap();
1030
1031 assert!(metadata.get(&U64::new(1)).is_none());
1033 assert!(metadata.get(&U64::new(2)).is_none());
1034
1035 metadata.put(U64::new(3), b"third".to_vec());
1037 metadata.put(U64::new(4), b"fourth".to_vec());
1038 metadata.sync().await.unwrap();
1039
1040 assert_eq!(metadata.get(&U64::new(3)).unwrap(), b"third");
1042 assert_eq!(metadata.get(&U64::new(4)).unwrap(), b"fourth");
1043 assert!(metadata.get(&U64::new(1)).is_none());
1044 assert!(metadata.get(&U64::new(2)).is_none());
1045
1046 metadata.destroy().await.unwrap();
1047 });
1048 }
1049
1050 fn test_metadata_operations_and_restart(num_operations: usize) -> String {
1051 let executor = deterministic::Runner::default();
1052 executor.start(|mut context| async move {
1053 let cfg = Config {
1054 partition: "test_determinism".to_string(),
1055 codec_config: ((0..).into(), ()),
1056 };
1057 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg.clone())
1058 .await
1059 .unwrap();
1060
1061 for i in 0..num_operations {
1063 let key = U64::new(i as u64);
1064 let mut value = vec![0u8; 64];
1065 context.fill_bytes(&mut value);
1066 metadata.put(key, value);
1067
1068 if context.gen_bool(0.1) {
1070 metadata.sync().await.unwrap();
1071 }
1072
1073 if context.gen_bool(0.1) {
1075 let selected_index = context.gen_range(0..=i);
1076 let update_key = U64::new(selected_index as u64);
1077 let mut new_value = vec![0u8; 64];
1078 context.fill_bytes(&mut new_value);
1079 metadata.put(update_key, new_value);
1080 }
1081
1082 if context.gen_bool(0.1) {
1084 let selected_index = context.gen_range(0..=i);
1085 let remove_key = U64::new(selected_index as u64);
1086 metadata.remove(&remove_key);
1087 }
1088
1089 if context.gen_bool(0.1) {
1091 let selected_index = context.gen_range(0..=i);
1092 let mut_key = U64::new(selected_index as u64);
1093 if let Some(value) = metadata.get_mut(&mut_key) {
1094 if !value.is_empty() {
1095 value[0] = value[0].wrapping_add(1);
1096 }
1097 }
1098 }
1099 }
1100 metadata.sync().await.unwrap();
1101
1102 metadata.destroy().await.unwrap();
1104
1105 context.auditor().state()
1106 })
1107 }
1108
1109 #[test_group("slow")]
1110 #[test_traced]
1111 fn test_determinism() {
1112 let state1 = test_metadata_operations_and_restart(1_000);
1113 let state2 = test_metadata_operations_and_restart(1_000);
1114 assert_eq!(state1, state2);
1115 }
1116
1117 #[test_traced]
1118 fn test_keys_iterator() {
1119 let executor = deterministic::Runner::default();
1121 executor.start(|context| async move {
1122 let cfg = Config {
1124 partition: "test".to_string(),
1125 codec_config: ((0..).into(), ()),
1126 };
1127 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.clone(), cfg)
1128 .await
1129 .unwrap();
1130
1131 metadata.put(U64::new(0x1000), b"value1".to_vec());
1133 metadata.put(U64::new(0x1001), b"value2".to_vec());
1134 metadata.put(U64::new(0x1002), b"value3".to_vec());
1135 metadata.put(U64::new(0x2000), b"value4".to_vec());
1136 metadata.put(U64::new(0x2001), b"value5".to_vec());
1137 metadata.put(U64::new(0x3000), b"value6".to_vec());
1138
1139 let all_keys: Vec<_> = metadata.keys().cloned().collect();
1141 assert_eq!(all_keys.len(), 6);
1142 assert!(all_keys.contains(&U64::new(0x1000)));
1143 assert!(all_keys.contains(&U64::new(0x3000)));
1144
1145 let prefix = hex!("0x00000000000010");
1147 let prefix_keys: Vec<_> = metadata
1148 .keys()
1149 .filter(|k| k.as_ref().starts_with(&prefix))
1150 .cloned()
1151 .collect();
1152 assert_eq!(prefix_keys.len(), 3);
1153 assert!(prefix_keys.contains(&U64::new(0x1000)));
1154 assert!(prefix_keys.contains(&U64::new(0x1001)));
1155 assert!(prefix_keys.contains(&U64::new(0x1002)));
1156 assert!(!prefix_keys.contains(&U64::new(0x2000)));
1157
1158 let prefix = hex!("0x00000000000020");
1160 let prefix_keys: Vec<_> = metadata
1161 .keys()
1162 .filter(|k| k.as_ref().starts_with(&prefix))
1163 .cloned()
1164 .collect();
1165 assert_eq!(prefix_keys.len(), 2);
1166 assert!(prefix_keys.contains(&U64::new(0x2000)));
1167 assert!(prefix_keys.contains(&U64::new(0x2001)));
1168
1169 let prefix = hex!("0x00000000000040");
1171 let prefix_keys: Vec<_> = metadata
1172 .keys()
1173 .filter(|k| k.as_ref().starts_with(&prefix))
1174 .cloned()
1175 .collect();
1176 assert_eq!(prefix_keys.len(), 0);
1177
1178 metadata.destroy().await.unwrap();
1179 });
1180 }
1181
1182 #[test_traced]
1183 fn test_retain() {
1184 let executor = deterministic::Runner::default();
1186 executor.start(|context| async move {
1187 let cfg = Config {
1189 partition: "test".to_string(),
1190 codec_config: ((0..).into(), ()),
1191 };
1192 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("first"), cfg)
1193 .await
1194 .unwrap();
1195
1196 metadata.put(U64::new(0x1000), b"value1".to_vec());
1198 metadata.put(U64::new(0x1001), b"value2".to_vec());
1199 metadata.put(U64::new(0x1002), b"value3".to_vec());
1200 metadata.put(U64::new(0x2000), b"value4".to_vec());
1201 metadata.put(U64::new(0x2001), b"value5".to_vec());
1202 metadata.put(U64::new(0x3000), b"value6".to_vec());
1203
1204 let buffer = context.encode();
1206 assert!(buffer.contains("first_keys 6"));
1207
1208 let prefix = hex!("0x00000000000010");
1210 metadata.retain(|k, _| !k.as_ref().starts_with(&prefix));
1211
1212 let buffer = context.encode();
1214 assert!(buffer.contains("first_keys 3"));
1215
1216 assert!(metadata.get(&U64::new(0x1000)).is_none());
1218 assert!(metadata.get(&U64::new(0x1001)).is_none());
1219 assert!(metadata.get(&U64::new(0x1002)).is_none());
1220 assert!(metadata.get(&U64::new(0x2000)).is_some());
1221 assert!(metadata.get(&U64::new(0x2001)).is_some());
1222 assert!(metadata.get(&U64::new(0x3000)).is_some());
1223
1224 metadata.sync().await.unwrap();
1226 drop(metadata);
1227 let cfg = Config {
1228 partition: "test".to_string(),
1229 codec_config: ((0..).into(), ()),
1230 };
1231 let mut metadata = Metadata::<_, U64, Vec<u8>>::init(context.with_label("second"), cfg)
1232 .await
1233 .unwrap();
1234
1235 assert!(metadata.get(&U64::new(0x1000)).is_none());
1237 assert!(metadata.get(&U64::new(0x2000)).is_some());
1238 assert_eq!(metadata.keys().count(), 3);
1239
1240 let prefix = hex!("0x00000000000040");
1242 metadata.retain(|k, _| !k.as_ref().starts_with(&prefix));
1243
1244 metadata.retain(|_, _| false);
1246 assert_eq!(metadata.keys().count(), 0);
1247
1248 metadata.destroy().await.unwrap();
1249 });
1250 }
1251}