1#[cfg(test)]
206mod conformance;
207mod storage;
208use commonware_runtime::buffer::paged::CacheRef;
209use commonware_utils::Array;
210use std::num::NonZeroUsize;
211pub use storage::{Checkpoint, Cursor, Freezer};
212use thiserror::Error;
213
214pub enum Identifier<'a, K: Array> {
216 Cursor(Cursor),
217 Key(&'a K),
218}
219
220#[derive(Debug, Error)]
222pub enum Error {
223 #[error("runtime error: {0}")]
224 Runtime(#[from] commonware_runtime::Error),
225 #[error("journal error: {0}")]
226 Journal(#[from] crate::journal::Error),
227 #[error("codec error: {0}")]
228 Codec(#[from] commonware_codec::Error),
229}
230
231#[derive(Clone)]
233pub struct Config<C> {
234 pub key_partition: String,
236
237 pub key_write_buffer: NonZeroUsize,
239
240 pub key_page_cache: CacheRef,
242
243 pub value_partition: String,
245
246 pub value_compression: Option<u8>,
248
249 pub value_write_buffer: NonZeroUsize,
251
252 pub value_target_size: u64,
254
255 pub table_partition: String,
257
258 pub table_initial_size: u32,
260
261 pub table_resize_frequency: u8,
264
265 pub table_resize_chunk_size: u32,
267
268 pub table_replay_buffer: NonZeroUsize,
270
271 pub codec_config: C,
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use commonware_codec::DecodeExt;
279 use commonware_formatting::hex;
280 use commonware_macros::{test_group, test_traced};
281 use commonware_runtime::{deterministic, Blob, Metrics as _, Runner, Storage, Supervisor as _};
282 use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16};
283 use rand::{Rng, RngCore};
284 use std::num::NonZeroU16;
285
286 fn test_key(key: &str) -> FixedBytes<64> {
287 let mut buf = [0u8; 64];
288 let key = key.as_bytes();
289 assert!(key.len() <= buf.len());
290 buf[..key.len()].copy_from_slice(key);
291 FixedBytes::decode(buf.as_ref()).unwrap()
292 }
293
294 const DEFAULT_WRITE_BUFFER: usize = 1024;
295 const DEFAULT_VALUE_TARGET_SIZE: u64 = 10 * 1024 * 1024;
296 const DEFAULT_TABLE_INITIAL_SIZE: u32 = 256;
297 const DEFAULT_TABLE_RESIZE_FREQUENCY: u8 = 4;
298 const DEFAULT_TABLE_RESIZE_CHUNK_SIZE: u32 = 128; const DEFAULT_TABLE_REPLAY_BUFFER: usize = 64 * 1024; const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
301 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
302
303 fn test_put_get(compression: Option<u8>) {
304 let executor = deterministic::Runner::default();
306 executor.start(|context| async move {
307 let cfg = Config {
309 key_partition: "test-key-index".into(),
310 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
311 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
312 value_partition: "test-value-journal".into(),
313 value_compression: compression,
314 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
315 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
316 table_partition: "test-table".into(),
317 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
318 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
319 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
320 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
321 codec_config: (),
322 };
323 let mut freezer =
324 Freezer::<_, FixedBytes<64>, i32>::init(context.child("storage"), cfg.clone())
325 .await
326 .expect("Failed to initialize freezer");
327
328 let key = test_key("testkey");
329 let data = 42;
330
331 let value = freezer
333 .get(Identifier::Key(&key))
334 .await
335 .expect("Failed to check key");
336 assert!(value.is_none());
337
338 freezer
340 .put(key.clone(), data)
341 .await
342 .expect("Failed to put data");
343
344 let value = freezer
346 .get(Identifier::Key(&key))
347 .await
348 .expect("Failed to get data")
349 .expect("Data not found");
350 assert_eq!(value, data);
351
352 let buffer = context.encode();
354 assert!(buffer.contains("gets_total 2"), "{}", buffer);
355 assert!(buffer.contains("puts_total 1"), "{}", buffer);
356 assert!(buffer.contains("unnecessary_reads_total 0"), "{}", buffer);
357
358 freezer.sync().await.expect("Failed to sync data");
360 });
361 }
362
363 #[test_traced]
364 fn test_put_get_no_compression() {
365 test_put_get(None);
366 }
367
368 #[test_traced]
369 fn test_put_get_compression() {
370 test_put_get(Some(3));
371 }
372
373 #[test_traced]
374 fn test_multiple_keys() {
375 let executor = deterministic::Runner::default();
377 executor.start(|context| async move {
378 let cfg = Config {
380 key_partition: "test-key-index".into(),
381 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
382 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
383 value_partition: "test-value-journal".into(),
384 value_compression: None,
385 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
386 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
387 table_partition: "test-table".into(),
388 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
389 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
390 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
391 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
392 codec_config: (),
393 };
394 let mut freezer =
395 Freezer::<_, FixedBytes<64>, i32>::init(context.child("storage"), cfg.clone())
396 .await
397 .expect("Failed to initialize freezer");
398
399 let keys = vec![
401 (test_key("key1"), 1),
402 (test_key("key2"), 2),
403 (test_key("key3"), 3),
404 (test_key("key4"), 4),
405 (test_key("key5"), 5),
406 ];
407
408 for (key, data) in &keys {
409 freezer
410 .put(key.clone(), *data)
411 .await
412 .expect("Failed to put data");
413 }
414
415 for (key, data) in &keys {
417 let retrieved = freezer
418 .get(Identifier::Key(key))
419 .await
420 .expect("Failed to get data")
421 .expect("Data not found");
422 assert_eq!(retrieved, *data);
423 }
424 });
425 }
426
427 #[test_traced]
428 fn test_collision_handling() {
429 let executor = deterministic::Runner::default();
431 executor.start(|context| async move {
432 let cfg = Config {
434 key_partition: "test-key-index".into(),
435 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
436 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
437 value_partition: "test-value-journal".into(),
438 value_compression: None,
439 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
440 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
441 table_partition: "test-table".into(),
442 table_initial_size: 4, table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
444 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
445 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
446 codec_config: (),
447 };
448 let mut freezer =
449 Freezer::<_, FixedBytes<64>, i32>::init(context.child("storage"), cfg.clone())
450 .await
451 .expect("Failed to initialize freezer");
452
453 let keys = vec![
455 (test_key("key1"), 1),
456 (test_key("key2"), 2),
457 (test_key("key3"), 3),
458 (test_key("key4"), 4),
459 (test_key("key5"), 5),
460 (test_key("key6"), 6),
461 (test_key("key7"), 7),
462 (test_key("key8"), 8),
463 ];
464
465 for (key, data) in &keys {
466 freezer
467 .put(key.clone(), *data)
468 .await
469 .expect("Failed to put data");
470 }
471
472 freezer.sync().await.expect("Failed to sync");
474
475 for (key, data) in &keys {
477 let retrieved = freezer
478 .get(Identifier::Key(key))
479 .await
480 .expect("Failed to get data")
481 .expect("Data not found");
482 assert_eq!(retrieved, *data);
483 }
484
485 let buffer = context.encode();
487 assert!(buffer.contains("gets_total 8"), "{}", buffer);
488 assert!(buffer.contains("unnecessary_reads_total 5"), "{}", buffer);
489 });
490 }
491
492 #[test_traced]
493 fn test_restart() {
494 let executor = deterministic::Runner::default();
496 executor.start(|context| async move {
497 let cfg = Config {
498 key_partition: "test-key-index".into(),
499 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
500 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
501 value_partition: "test-value-journal".into(),
502 value_compression: None,
503 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
504 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
505 table_partition: "test-table".into(),
506 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
507 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
508 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
509 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
510 codec_config: (),
511 };
512
513 let checkpoint = {
515 let mut freezer =
516 Freezer::<_, FixedBytes<64>, i32>::init(context.child("first"), cfg.clone())
517 .await
518 .expect("Failed to initialize freezer");
519
520 let keys = vec![
521 (test_key("persist1"), 100),
522 (test_key("persist2"), 200),
523 (test_key("persist3"), 300),
524 ];
525
526 for (key, data) in &keys {
527 freezer
528 .put(key.clone(), *data)
529 .await
530 .expect("Failed to put data");
531 }
532
533 freezer.close().await.expect("Failed to close freezer")
534 };
535
536 {
538 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
539 context.child("second"),
540 cfg.clone(),
541 Some(checkpoint),
542 )
543 .await
544 .expect("Failed to initialize freezer");
545
546 let keys = vec![
547 (test_key("persist1"), 100),
548 (test_key("persist2"), 200),
549 (test_key("persist3"), 300),
550 ];
551
552 for (key, data) in &keys {
553 let retrieved = freezer
554 .get(Identifier::Key(key))
555 .await
556 .expect("Failed to get data")
557 .expect("Data not found");
558 assert_eq!(retrieved, *data);
559 }
560 }
561 });
562 }
563
564 #[test_traced]
565 fn test_crash_consistency() {
566 let executor = deterministic::Runner::default();
568 executor.start(|context| async move {
569 let cfg = Config {
570 key_partition: "test-key-index".into(),
571 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
572 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
573 value_partition: "test-value-journal".into(),
574 value_compression: None,
575 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
576 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
577 table_partition: "test-table".into(),
578 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
579 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
580 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
581 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
582 codec_config: (),
583 };
584
585 let checkpoint = {
587 let mut freezer =
588 Freezer::<_, FixedBytes<64>, i32>::init(context.child("first"), cfg.clone())
589 .await
590 .expect("Failed to initialize freezer");
591
592 freezer
593 .put(test_key("committed1"), 1)
594 .await
595 .expect("Failed to put data");
596 freezer
597 .put(test_key("committed2"), 2)
598 .await
599 .expect("Failed to put data");
600
601 freezer.sync().await.expect("Failed to sync");
603
604 freezer
606 .put(test_key("uncommitted1"), 3)
607 .await
608 .expect("Failed to put data");
609 freezer
610 .put(test_key("uncommitted2"), 4)
611 .await
612 .expect("Failed to put data");
613
614 freezer.close().await.expect("Failed to close")
616 };
617
618 {
620 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
621 context.child("second"),
622 cfg.clone(),
623 Some(checkpoint),
624 )
625 .await
626 .expect("Failed to initialize freezer");
627
628 assert_eq!(
630 freezer
631 .get(Identifier::Key(&test_key("committed1")))
632 .await
633 .unwrap(),
634 Some(1)
635 );
636 assert_eq!(
637 freezer
638 .get(Identifier::Key(&test_key("committed2")))
639 .await
640 .unwrap(),
641 Some(2)
642 );
643
644 if let Some(val) = freezer
647 .get(Identifier::Key(&test_key("uncommitted1")))
648 .await
649 .unwrap()
650 {
651 assert_eq!(val, 3);
652 }
653 if let Some(val) = freezer
654 .get(Identifier::Key(&test_key("uncommitted2")))
655 .await
656 .unwrap()
657 {
658 assert_eq!(val, 4);
659 }
660 }
661 });
662 }
663
664 #[test_traced]
665 fn test_destroy() {
666 let executor = deterministic::Runner::default();
668 executor.start(|context| async move {
669 let cfg = Config {
671 key_partition: "test-key-index".into(),
672 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
673 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
674 value_partition: "test-value-journal".into(),
675 value_compression: None,
676 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
677 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
678 table_partition: "test-table".into(),
679 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
680 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
681 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
682 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
683 codec_config: (),
684 };
685 {
686 let mut freezer =
687 Freezer::<_, FixedBytes<64>, i32>::init(context.child("first"), cfg.clone())
688 .await
689 .expect("Failed to initialize freezer");
690
691 freezer
692 .put(test_key("destroy1"), 1)
693 .await
694 .expect("Failed to put data");
695 freezer
696 .put(test_key("destroy2"), 2)
697 .await
698 .expect("Failed to put data");
699
700 freezer.destroy().await.expect("Failed to destroy freezer");
702 }
703
704 {
706 let freezer =
707 Freezer::<_, FixedBytes<64>, i32>::init(context.child("second"), cfg.clone())
708 .await
709 .expect("Failed to initialize freezer");
710
711 assert!(freezer
713 .get(Identifier::Key(&test_key("destroy1")))
714 .await
715 .unwrap()
716 .is_none());
717 assert!(freezer
718 .get(Identifier::Key(&test_key("destroy2")))
719 .await
720 .unwrap()
721 .is_none());
722 }
723 });
724 }
725
726 #[test_traced]
727 fn test_partial_table_entry_write() {
728 let executor = deterministic::Runner::default();
730 executor.start(|context| async move {
731 let cfg = Config {
733 key_partition: "test-key-index".into(),
734 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
735 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
736 value_partition: "test-value-journal".into(),
737 value_compression: None,
738 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
739 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
740 table_partition: "test-table".into(),
741 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
742 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
743 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
744 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
745 codec_config: (),
746 };
747 let checkpoint = {
748 let mut freezer =
749 Freezer::<_, FixedBytes<64>, i32>::init(context.child("first"), cfg.clone())
750 .await
751 .expect("Failed to initialize freezer");
752
753 freezer.put(test_key("key1"), 42).await.unwrap();
754 freezer.sync().await.unwrap();
755 freezer.close().await.unwrap()
756 };
757
758 {
760 let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
761 blob.write_at_sync(0, vec![0xFF; 10]).await.unwrap();
763 }
764
765 {
767 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
768 context.child("second"),
769 cfg.clone(),
770 Some(checkpoint),
771 )
772 .await
773 .expect("Failed to initialize freezer");
774
775 let result = freezer
778 .get(Identifier::Key(&test_key("key1")))
779 .await
780 .unwrap();
781 assert!(result.is_none() || result == Some(42));
782 }
783 });
784 }
785
786 #[test_traced]
787 fn test_table_entry_invalid_crc() {
788 let executor = deterministic::Runner::default();
790 executor.start(|context| async move {
791 let cfg = Config {
792 key_partition: "test-key-index".into(),
793 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
794 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
795 value_partition: "test-value-journal".into(),
796 value_compression: None,
797 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
798 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
799 table_partition: "test-table".into(),
800 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
801 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
802 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
803 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
804 codec_config: (),
805 };
806
807 let checkpoint = {
809 let mut freezer =
810 Freezer::<_, FixedBytes<64>, i32>::init(context.child("first"), cfg.clone())
811 .await
812 .expect("Failed to initialize freezer");
813
814 freezer.put(test_key("key1"), 42).await.unwrap();
815 freezer.sync().await.unwrap();
816 freezer.close().await.unwrap()
817 };
818
819 {
821 let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
822 let entry_data = blob.read_at(0, 24).await.unwrap();
824 let mut corrupted = entry_data.coalesce();
825 corrupted.as_mut()[20] ^= 0xFF;
827 blob.write_at_sync(0, corrupted).await.unwrap();
828 }
829
830 {
832 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
833 context.child("second"),
834 cfg.clone(),
835 Some(checkpoint),
836 )
837 .await
838 .expect("Failed to initialize freezer");
839
840 let result = freezer
842 .get(Identifier::Key(&test_key("key1")))
843 .await
844 .unwrap();
845 assert!(result.is_none() || result == Some(42));
847 }
848 });
849 }
850
851 #[test_traced]
852 fn test_table_extra_bytes() {
853 let executor = deterministic::Runner::default();
855 executor.start(|context| async move {
856 let cfg = Config {
857 key_partition: "test-key-index".into(),
858 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
859 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
860 value_partition: "test-value-journal".into(),
861 value_compression: None,
862 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
863 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
864 table_partition: "test-table".into(),
865 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
866 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
867 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
868 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
869 codec_config: (),
870 };
871
872 let checkpoint = {
874 let mut freezer =
875 Freezer::<_, FixedBytes<64>, i32>::init(context.child("first"), cfg.clone())
876 .await
877 .expect("Failed to initialize freezer");
878
879 freezer.put(test_key("key1"), 42).await.unwrap();
880 freezer.sync().await.unwrap();
881 freezer.close().await.unwrap()
882 };
883
884 {
886 let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
887 blob.write_at_sync(size, hex!("0xdeadbeef").to_vec())
889 .await
890 .unwrap();
891 }
892
893 {
895 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
896 context.child("second"),
897 cfg.clone(),
898 Some(checkpoint),
899 )
900 .await
901 .expect("Failed to initialize freezer");
902
903 assert_eq!(
905 freezer
906 .get(Identifier::Key(&test_key("key1")))
907 .await
908 .unwrap(),
909 Some(42)
910 );
911
912 let mut freezer_mut = freezer;
914 freezer_mut.put(test_key("key2"), 43).await.unwrap();
915 assert_eq!(
916 freezer_mut
917 .get(Identifier::Key(&test_key("key2")))
918 .await
919 .unwrap(),
920 Some(43)
921 );
922 }
923 });
924 }
925
926 #[test_traced]
927 fn test_indexing_across_resizes() {
928 let executor = deterministic::Runner::default();
930 executor.start(|context| async move {
931 let cfg = Config {
933 key_partition: "test-key-index".into(),
934 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
935 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
936 value_partition: "test-value-journal".into(),
937 value_compression: None,
938 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
939 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
940 table_partition: "test-table".into(),
941 table_initial_size: 2, table_resize_frequency: 2, table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
944 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
945 codec_config: (),
946 };
947 let mut freezer =
948 Freezer::<_, FixedBytes<64>, i32>::init(context.child("first"), cfg.clone())
949 .await
950 .expect("Failed to initialize freezer");
951
952 let mut keys = Vec::new();
955 for i in 0..1000 {
956 let key = test_key(&format!("key{i}"));
957 keys.push((key.clone(), i));
958
959 freezer.put(key, i).await.expect("Failed to put data");
961 freezer.sync().await.expect("Failed to sync");
962 }
963
964 for (key, value) in &keys {
966 let retrieved = freezer
967 .get(Identifier::Key(key))
968 .await
969 .expect("Failed to get data")
970 .expect("Data not found");
971 assert_eq!(retrieved, *value, "Value mismatch for key after resizes");
972 }
973
974 let buffer = context.encode();
978 assert!(buffer.contains("first_resizes_total 8"), "{}", buffer);
979
980 let checkpoint = freezer.close().await.expect("Failed to close");
982 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
983 context.child("second"),
984 cfg.clone(),
985 Some(checkpoint),
986 )
987 .await
988 .expect("Failed to reinitialize freezer");
989
990 for (key, value) in &keys {
992 let retrieved = freezer
993 .get(Identifier::Key(key))
994 .await
995 .expect("Failed to get data")
996 .expect("Data not found");
997 assert_eq!(retrieved, *value, "Value mismatch for key after restart");
998 }
999 });
1000 }
1001
1002 #[test_traced]
1003 fn test_insert_during_resize() {
1004 let executor = deterministic::Runner::default();
1005 executor.start(|context| async move {
1006 let cfg = Config {
1007 key_partition: "test-key-index".into(),
1008 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1009 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1010 value_partition: "test-value-journal".into(),
1011 value_compression: None,
1012 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1013 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1014 table_partition: "test-table".into(),
1015 table_initial_size: 2,
1016 table_resize_frequency: 1,
1017 table_resize_chunk_size: 1, table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1019 codec_config: (),
1020 };
1021 let mut freezer =
1022 Freezer::<_, FixedBytes<64>, i32>::init(context.child("storage"), cfg.clone())
1023 .await
1024 .unwrap();
1025
1026 freezer.put(test_key("key0"), 0).await.unwrap();
1029 freezer.put(test_key("key2"), 1).await.unwrap();
1030 freezer.sync().await.unwrap(); assert!(freezer.resizing().is_some());
1034
1035 freezer.put(test_key("key6"), 2).await.unwrap();
1038 assert!(context.encode().contains("unnecessary_writes_total 1"));
1039 assert_eq!(freezer.resizable(), 3);
1040
1041 freezer.put(test_key("key3"), 3).await.unwrap();
1044 assert!(context.encode().contains("unnecessary_writes_total 1"));
1045 assert_eq!(freezer.resizable(), 3);
1046
1047 freezer.sync().await.unwrap();
1049 assert!(freezer.resizing().is_none());
1050 assert_eq!(freezer.resizable(), 2);
1051
1052 freezer.put(test_key("key4"), 4).await.unwrap();
1055 freezer.put(test_key("key7"), 5).await.unwrap();
1056 freezer.sync().await.unwrap();
1057
1058 assert!(freezer.resizing().is_some());
1060
1061 let keys = ["key0", "key2", "key6", "key3", "key4", "key7"];
1063 for (i, k) in keys.iter().enumerate() {
1064 assert_eq!(
1065 freezer.get(Identifier::Key(&test_key(k))).await.unwrap(),
1066 Some(i as i32)
1067 );
1068 }
1069
1070 while freezer.resizing().is_some() {
1072 freezer.sync().await.unwrap();
1073 }
1074
1075 assert_eq!(freezer.resizable(), 0);
1077 });
1078 }
1079
1080 #[test_traced]
1081 fn test_resize_after_startup() {
1082 let executor = deterministic::Runner::default();
1083 executor.start(|context| async move {
1084 let cfg = Config {
1085 key_partition: "test-key-index".into(),
1086 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1087 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1088 value_partition: "test-value-journal".into(),
1089 value_compression: None,
1090 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1091 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1092 table_partition: "test-table".into(),
1093 table_initial_size: 2,
1094 table_resize_frequency: 1,
1095 table_resize_chunk_size: 1, table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1097 codec_config: (),
1098 };
1099
1100 let checkpoint = {
1102 let mut freezer =
1103 Freezer::<_, FixedBytes<64>, i32>::init(context.child("first"), cfg.clone())
1104 .await
1105 .unwrap();
1106
1107 freezer.put(test_key("key0"), 0).await.unwrap();
1110 freezer.put(test_key("key2"), 1).await.unwrap();
1111 let checkpoint = freezer.sync().await.unwrap();
1112
1113 assert!(freezer.resizing().is_some());
1115
1116 checkpoint
1117 };
1118
1119 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
1121 context.child("second"),
1122 cfg.clone(),
1123 Some(checkpoint),
1124 )
1125 .await
1126 .unwrap();
1127 assert_eq!(freezer.resizable(), 1);
1128
1129 freezer.sync().await.unwrap();
1132 assert!(freezer.resizing().is_some());
1133
1134 while freezer.resizing().is_some() {
1136 freezer.sync().await.unwrap();
1137 }
1138
1139 assert_eq!(freezer.resizable(), 0);
1141 });
1142 }
1143
1144 fn test_operations_and_restart(num_keys: usize) -> String {
1145 let executor = deterministic::Runner::default();
1146 executor.start(|mut context| async move {
1147 let cfg = Config {
1148 key_partition: "test-key-index".into(),
1149 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1150 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1151 value_partition: "test-value-journal".into(),
1152 value_compression: None,
1153 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1154 value_target_size: 128, table_partition: "test-table".into(),
1156 table_initial_size: 8, table_resize_frequency: 2, table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1159 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1160 codec_config: (),
1161 };
1162 let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init(
1163 context.child("init").with_attribute("index", 1),
1164 cfg.clone(),
1165 )
1166 .await
1167 .expect("Failed to initialize freezer");
1168
1169 let mut pairs = Vec::new();
1171
1172 for _ in 0..num_keys {
1173 let mut key = [0u8; 96];
1175 context.fill_bytes(&mut key);
1176 let key = FixedBytes::<96>::new(key);
1177
1178 let mut value = [0u8; 256];
1180 context.fill_bytes(&mut value);
1181 let value = FixedBytes::<256>::new(value);
1182
1183 freezer
1185 .put(key.clone(), value.clone())
1186 .await
1187 .expect("Failed to put data");
1188 pairs.push((key, value));
1189
1190 if context.gen_bool(0.1) {
1192 freezer.sync().await.expect("Failed to sync");
1193 }
1194 }
1195
1196 freezer.sync().await.expect("Failed to sync");
1198
1199 for (key, value) in &pairs {
1201 let retrieved = freezer
1202 .get(Identifier::Key(key))
1203 .await
1204 .expect("Failed to get data")
1205 .expect("Data not found");
1206 assert_eq!(&retrieved, value);
1207 }
1208
1209 for (key, _) in &pairs {
1211 assert!(freezer
1212 .get(Identifier::Key(key))
1213 .await
1214 .expect("Failed to check key")
1215 .is_some());
1216 }
1217
1218 for _ in 0..10 {
1220 let mut key = [0u8; 96];
1221 context.fill_bytes(&mut key);
1222 let key = FixedBytes::<96>::new(key);
1223 assert!(freezer
1224 .get(Identifier::Key(&key))
1225 .await
1226 .expect("Failed to check key")
1227 .is_none());
1228 }
1229
1230 let checkpoint = freezer.close().await.expect("Failed to close freezer");
1232
1233 let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init_with_checkpoint(
1235 context.child("init").with_attribute("index", 2),
1236 cfg.clone(),
1237 Some(checkpoint),
1238 )
1239 .await
1240 .expect("Failed to initialize freezer");
1241
1242 for (key, value) in &pairs {
1244 let retrieved = freezer
1245 .get(Identifier::Key(key))
1246 .await
1247 .expect("Failed to get data")
1248 .expect("Data not found");
1249 assert_eq!(&retrieved, value);
1250 }
1251
1252 for _ in 0..20 {
1254 let mut key = [0u8; 96];
1255 context.fill_bytes(&mut key);
1256 let key = FixedBytes::<96>::new(key);
1257
1258 let mut value = [0u8; 256];
1259 context.fill_bytes(&mut value);
1260 let value = FixedBytes::<256>::new(value);
1261
1262 freezer.put(key, value).await.expect("Failed to put data");
1263 }
1264
1265 for _ in 0..3 {
1267 freezer.sync().await.expect("Failed to sync");
1268
1269 for _ in 0..5 {
1271 let mut key = [0u8; 96];
1272 context.fill_bytes(&mut key);
1273 let key = FixedBytes::<96>::new(key);
1274
1275 let mut value = [0u8; 256];
1276 context.fill_bytes(&mut value);
1277 let value = FixedBytes::<256>::new(value);
1278
1279 freezer.put(key, value).await.expect("Failed to put data");
1280 }
1281 }
1282
1283 freezer.sync().await.expect("Failed to sync");
1285
1286 context.auditor().state()
1288 })
1289 }
1290
1291 #[test_group("slow")]
1292 #[test_traced]
1293 fn test_determinism() {
1294 let state1 = test_operations_and_restart(1_000);
1295 let state2 = test_operations_and_restart(1_000);
1296 assert_eq!(state1, state2);
1297 }
1298
1299 #[test_traced]
1300 fn test_put_multiple_updates() {
1301 let executor = deterministic::Runner::default();
1303 executor.start(|context| async move {
1304 let cfg = Config {
1306 key_partition: "test-key-index".into(),
1307 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1308 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1309 value_partition: "test-value-journal".into(),
1310 value_compression: None,
1311 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1312 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1313 table_partition: "test-table".into(),
1314 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
1315 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
1316 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1317 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1318 codec_config: (),
1319 };
1320 let mut freezer =
1321 Freezer::<_, FixedBytes<64>, i32>::init(context.child("storage"), cfg.clone())
1322 .await
1323 .expect("Failed to initialize freezer");
1324
1325 let key = test_key("key1");
1326
1327 freezer
1328 .put(key.clone(), 1)
1329 .await
1330 .expect("Failed to put data");
1331 freezer
1332 .put(key.clone(), 2)
1333 .await
1334 .expect("Failed to put data");
1335 freezer.sync().await.expect("Failed to sync");
1336 assert_eq!(
1337 freezer
1338 .get(Identifier::Key(&key))
1339 .await
1340 .expect("Failed to get data")
1341 .unwrap(),
1342 2
1343 );
1344 });
1345 }
1346}