1#[cfg(test)]
206mod conformance;
207mod storage;
208use commonware_runtime::buffer::paged::CacheRef;
209use commonware_utils::Array;
210use std::num::NonZeroUsize;
211pub use storage::{Checkpoint, Cursor, Freezer};
212use thiserror::Error;
213
214pub enum Identifier<'a, K: Array> {
216 Cursor(Cursor),
217 Key(&'a K),
218}
219
220#[derive(Debug, Error)]
222pub enum Error {
223 #[error("runtime error: {0}")]
224 Runtime(#[from] commonware_runtime::Error),
225 #[error("journal error: {0}")]
226 Journal(#[from] crate::journal::Error),
227 #[error("codec error: {0}")]
228 Codec(#[from] commonware_codec::Error),
229}
230
231#[derive(Clone)]
233pub struct Config<C> {
234 pub key_partition: String,
236
237 pub key_write_buffer: NonZeroUsize,
239
240 pub key_page_cache: CacheRef,
242
243 pub value_partition: String,
245
246 pub value_compression: Option<u8>,
248
249 pub value_write_buffer: NonZeroUsize,
251
252 pub value_target_size: u64,
254
255 pub table_partition: String,
257
258 pub table_initial_size: u32,
260
261 pub table_resize_frequency: u8,
264
265 pub table_resize_chunk_size: u32,
267
268 pub table_replay_buffer: NonZeroUsize,
270
271 pub codec_config: C,
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use crate::kv::tests::test_key;
279 use commonware_macros::{test_group, test_traced};
280 use commonware_runtime::{deterministic, Blob, IoBufMut, Metrics, Runner, Storage};
281 use commonware_utils::{hex, sequence::FixedBytes, NZUsize, NZU16};
282 use rand::{Rng, RngCore};
283 use std::num::NonZeroU16;
284
285 const DEFAULT_WRITE_BUFFER: usize = 1024;
286 const DEFAULT_VALUE_TARGET_SIZE: u64 = 10 * 1024 * 1024;
287 const DEFAULT_TABLE_INITIAL_SIZE: u32 = 256;
288 const DEFAULT_TABLE_RESIZE_FREQUENCY: u8 = 4;
289 const DEFAULT_TABLE_RESIZE_CHUNK_SIZE: u32 = 128; const DEFAULT_TABLE_REPLAY_BUFFER: usize = 64 * 1024; const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
292 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
293
294 fn test_put_get(compression: Option<u8>) {
295 let executor = deterministic::Runner::default();
297 executor.start(|context| async move {
298 let cfg = Config {
300 key_partition: "test_key_index".into(),
301 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
302 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
303 value_partition: "test_value_journal".into(),
304 value_compression: compression,
305 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
306 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
307 table_partition: "test_table".into(),
308 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
309 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
310 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
311 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
312 codec_config: (),
313 };
314 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
315 .await
316 .expect("Failed to initialize freezer");
317
318 let key = test_key("testkey");
319 let data = 42;
320
321 let value = freezer
323 .get(Identifier::Key(&key))
324 .await
325 .expect("Failed to check key");
326 assert!(value.is_none());
327
328 freezer
330 .put(key.clone(), data)
331 .await
332 .expect("Failed to put data");
333
334 let value = freezer
336 .get(Identifier::Key(&key))
337 .await
338 .expect("Failed to get data")
339 .expect("Data not found");
340 assert_eq!(value, data);
341
342 let buffer = context.encode();
344 assert!(buffer.contains("gets_total 2"), "{}", buffer);
345 assert!(buffer.contains("puts_total 1"), "{}", buffer);
346 assert!(buffer.contains("unnecessary_reads_total 0"), "{}", buffer);
347
348 freezer.sync().await.expect("Failed to sync data");
350 });
351 }
352
353 #[test_traced]
354 fn test_put_get_no_compression() {
355 test_put_get(None);
356 }
357
358 #[test_traced]
359 fn test_put_get_compression() {
360 test_put_get(Some(3));
361 }
362
363 #[test_traced]
364 fn test_multiple_keys() {
365 let executor = deterministic::Runner::default();
367 executor.start(|context| async move {
368 let cfg = Config {
370 key_partition: "test_key_index".into(),
371 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
372 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
373 value_partition: "test_value_journal".into(),
374 value_compression: None,
375 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
376 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
377 table_partition: "test_table".into(),
378 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
379 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
380 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
381 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
382 codec_config: (),
383 };
384 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
385 .await
386 .expect("Failed to initialize freezer");
387
388 let keys = vec![
390 (test_key("key1"), 1),
391 (test_key("key2"), 2),
392 (test_key("key3"), 3),
393 (test_key("key4"), 4),
394 (test_key("key5"), 5),
395 ];
396
397 for (key, data) in &keys {
398 freezer
399 .put(key.clone(), *data)
400 .await
401 .expect("Failed to put data");
402 }
403
404 for (key, data) in &keys {
406 let retrieved = freezer
407 .get(Identifier::Key(key))
408 .await
409 .expect("Failed to get data")
410 .expect("Data not found");
411 assert_eq!(retrieved, *data);
412 }
413 });
414 }
415
416 #[test_traced]
417 fn test_collision_handling() {
418 let executor = deterministic::Runner::default();
420 executor.start(|context| async move {
421 let cfg = Config {
423 key_partition: "test_key_index".into(),
424 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
425 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
426 value_partition: "test_value_journal".into(),
427 value_compression: None,
428 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
429 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
430 table_partition: "test_table".into(),
431 table_initial_size: 4, table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
433 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
434 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
435 codec_config: (),
436 };
437 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
438 .await
439 .expect("Failed to initialize freezer");
440
441 let keys = vec![
443 (test_key("key1"), 1),
444 (test_key("key2"), 2),
445 (test_key("key3"), 3),
446 (test_key("key4"), 4),
447 (test_key("key5"), 5),
448 (test_key("key6"), 6),
449 (test_key("key7"), 7),
450 (test_key("key8"), 8),
451 ];
452
453 for (key, data) in &keys {
454 freezer
455 .put(key.clone(), *data)
456 .await
457 .expect("Failed to put data");
458 }
459
460 freezer.sync().await.expect("Failed to sync");
462
463 for (key, data) in &keys {
465 let retrieved = freezer
466 .get(Identifier::Key(key))
467 .await
468 .expect("Failed to get data")
469 .expect("Data not found");
470 assert_eq!(retrieved, *data);
471 }
472
473 let buffer = context.encode();
475 assert!(buffer.contains("gets_total 8"), "{}", buffer);
476 assert!(buffer.contains("unnecessary_reads_total 5"), "{}", buffer);
477 });
478 }
479
480 #[test_traced]
481 fn test_restart() {
482 let executor = deterministic::Runner::default();
484 executor.start(|context| async move {
485 let cfg = Config {
486 key_partition: "test_key_index".into(),
487 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
488 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
489 value_partition: "test_value_journal".into(),
490 value_compression: None,
491 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
492 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
493 table_partition: "test_table".into(),
494 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
495 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
496 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
497 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
498 codec_config: (),
499 };
500
501 let checkpoint = {
503 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
504 context.with_label("first"),
505 cfg.clone(),
506 )
507 .await
508 .expect("Failed to initialize freezer");
509
510 let keys = vec![
511 (test_key("persist1"), 100),
512 (test_key("persist2"), 200),
513 (test_key("persist3"), 300),
514 ];
515
516 for (key, data) in &keys {
517 freezer
518 .put(key.clone(), *data)
519 .await
520 .expect("Failed to put data");
521 }
522
523 freezer.close().await.expect("Failed to close freezer")
524 };
525
526 {
528 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
529 context.with_label("second"),
530 cfg.clone(),
531 Some(checkpoint),
532 )
533 .await
534 .expect("Failed to initialize freezer");
535
536 let keys = vec![
537 (test_key("persist1"), 100),
538 (test_key("persist2"), 200),
539 (test_key("persist3"), 300),
540 ];
541
542 for (key, data) in &keys {
543 let retrieved = freezer
544 .get(Identifier::Key(key))
545 .await
546 .expect("Failed to get data")
547 .expect("Data not found");
548 assert_eq!(retrieved, *data);
549 }
550 }
551 });
552 }
553
554 #[test_traced]
555 fn test_crash_consistency() {
556 let executor = deterministic::Runner::default();
558 executor.start(|context| async move {
559 let cfg = Config {
560 key_partition: "test_key_index".into(),
561 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
562 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
563 value_partition: "test_value_journal".into(),
564 value_compression: None,
565 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
566 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
567 table_partition: "test_table".into(),
568 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
569 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
570 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
571 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
572 codec_config: (),
573 };
574
575 let checkpoint = {
577 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
578 context.with_label("first"),
579 cfg.clone(),
580 )
581 .await
582 .expect("Failed to initialize freezer");
583
584 freezer
585 .put(test_key("committed1"), 1)
586 .await
587 .expect("Failed to put data");
588 freezer
589 .put(test_key("committed2"), 2)
590 .await
591 .expect("Failed to put data");
592
593 freezer.sync().await.expect("Failed to sync");
595
596 freezer
598 .put(test_key("uncommitted1"), 3)
599 .await
600 .expect("Failed to put data");
601 freezer
602 .put(test_key("uncommitted2"), 4)
603 .await
604 .expect("Failed to put data");
605
606 freezer.close().await.expect("Failed to close")
608 };
609
610 {
612 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
613 context.with_label("second"),
614 cfg.clone(),
615 Some(checkpoint),
616 )
617 .await
618 .expect("Failed to initialize freezer");
619
620 assert_eq!(
622 freezer
623 .get(Identifier::Key(&test_key("committed1")))
624 .await
625 .unwrap(),
626 Some(1)
627 );
628 assert_eq!(
629 freezer
630 .get(Identifier::Key(&test_key("committed2")))
631 .await
632 .unwrap(),
633 Some(2)
634 );
635
636 if let Some(val) = freezer
639 .get(Identifier::Key(&test_key("uncommitted1")))
640 .await
641 .unwrap()
642 {
643 assert_eq!(val, 3);
644 }
645 if let Some(val) = freezer
646 .get(Identifier::Key(&test_key("uncommitted2")))
647 .await
648 .unwrap()
649 {
650 assert_eq!(val, 4);
651 }
652 }
653 });
654 }
655
656 #[test_traced]
657 fn test_destroy() {
658 let executor = deterministic::Runner::default();
660 executor.start(|context| async move {
661 let cfg = Config {
663 key_partition: "test_key_index".into(),
664 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
665 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
666 value_partition: "test_value_journal".into(),
667 value_compression: None,
668 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
669 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
670 table_partition: "test_table".into(),
671 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
672 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
673 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
674 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
675 codec_config: (),
676 };
677 {
678 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
679 context.with_label("first"),
680 cfg.clone(),
681 )
682 .await
683 .expect("Failed to initialize freezer");
684
685 freezer
686 .put(test_key("destroy1"), 1)
687 .await
688 .expect("Failed to put data");
689 freezer
690 .put(test_key("destroy2"), 2)
691 .await
692 .expect("Failed to put data");
693
694 freezer.destroy().await.expect("Failed to destroy freezer");
696 }
697
698 {
700 let freezer = Freezer::<_, FixedBytes<64>, i32>::init(
701 context.with_label("second"),
702 cfg.clone(),
703 )
704 .await
705 .expect("Failed to initialize freezer");
706
707 assert!(freezer
709 .get(Identifier::Key(&test_key("destroy1")))
710 .await
711 .unwrap()
712 .is_none());
713 assert!(freezer
714 .get(Identifier::Key(&test_key("destroy2")))
715 .await
716 .unwrap()
717 .is_none());
718 }
719 });
720 }
721
722 #[test_traced]
723 fn test_partial_table_entry_write() {
724 let executor = deterministic::Runner::default();
726 executor.start(|context| async move {
727 let cfg = Config {
729 key_partition: "test_key_index".into(),
730 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
731 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
732 value_partition: "test_value_journal".into(),
733 value_compression: None,
734 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
735 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
736 table_partition: "test_table".into(),
737 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
738 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
739 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
740 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
741 codec_config: (),
742 };
743 let checkpoint = {
744 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
745 context.with_label("first"),
746 cfg.clone(),
747 )
748 .await
749 .expect("Failed to initialize freezer");
750
751 freezer.put(test_key("key1"), 42).await.unwrap();
752 freezer.sync().await.unwrap();
753 freezer.close().await.unwrap()
754 };
755
756 {
758 let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
759 blob.write_at(0, vec![0xFF; 10]).await.unwrap();
761 blob.sync().await.unwrap();
762 }
763
764 {
766 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
767 context.with_label("second"),
768 cfg.clone(),
769 Some(checkpoint),
770 )
771 .await
772 .expect("Failed to initialize freezer");
773
774 let result = freezer
777 .get(Identifier::Key(&test_key("key1")))
778 .await
779 .unwrap();
780 assert!(result.is_none() || result == Some(42));
781 }
782 });
783 }
784
785 #[test_traced]
786 fn test_table_entry_invalid_crc() {
787 let executor = deterministic::Runner::default();
789 executor.start(|context| async move {
790 let cfg = Config {
791 key_partition: "test_key_index".into(),
792 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
793 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
794 value_partition: "test_value_journal".into(),
795 value_compression: None,
796 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
797 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
798 table_partition: "test_table".into(),
799 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
800 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
801 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
802 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
803 codec_config: (),
804 };
805
806 let checkpoint = {
808 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
809 context.with_label("first"),
810 cfg.clone(),
811 )
812 .await
813 .expect("Failed to initialize freezer");
814
815 freezer.put(test_key("key1"), 42).await.unwrap();
816 freezer.sync().await.unwrap();
817 freezer.close().await.unwrap()
818 };
819
820 {
822 let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
823 let entry_data = blob.read_at(0, IoBufMut::zeroed(24)).await.unwrap();
825 let mut corrupted = entry_data.coalesce();
826 corrupted.as_mut()[20] ^= 0xFF;
828 blob.write_at(0, corrupted).await.unwrap();
829 blob.sync().await.unwrap();
830 }
831
832 {
834 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
835 context.with_label("second"),
836 cfg.clone(),
837 Some(checkpoint),
838 )
839 .await
840 .expect("Failed to initialize freezer");
841
842 let result = freezer
844 .get(Identifier::Key(&test_key("key1")))
845 .await
846 .unwrap();
847 assert!(result.is_none() || result == Some(42));
849 }
850 });
851 }
852
853 #[test_traced]
854 fn test_table_extra_bytes() {
855 let executor = deterministic::Runner::default();
857 executor.start(|context| async move {
858 let cfg = Config {
859 key_partition: "test_key_index".into(),
860 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
861 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
862 value_partition: "test_value_journal".into(),
863 value_compression: None,
864 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
865 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
866 table_partition: "test_table".into(),
867 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
868 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
869 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
870 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
871 codec_config: (),
872 };
873
874 let checkpoint = {
876 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
877 context.with_label("first"),
878 cfg.clone(),
879 )
880 .await
881 .expect("Failed to initialize freezer");
882
883 freezer.put(test_key("key1"), 42).await.unwrap();
884 freezer.sync().await.unwrap();
885 freezer.close().await.unwrap()
886 };
887
888 {
890 let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
891 blob.write_at(size, hex!("0xdeadbeef").to_vec())
893 .await
894 .unwrap();
895 blob.sync().await.unwrap();
896 }
897
898 {
900 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
901 context.with_label("second"),
902 cfg.clone(),
903 Some(checkpoint),
904 )
905 .await
906 .expect("Failed to initialize freezer");
907
908 assert_eq!(
910 freezer
911 .get(Identifier::Key(&test_key("key1")))
912 .await
913 .unwrap(),
914 Some(42)
915 );
916
917 let mut freezer_mut = freezer;
919 freezer_mut.put(test_key("key2"), 43).await.unwrap();
920 assert_eq!(
921 freezer_mut
922 .get(Identifier::Key(&test_key("key2")))
923 .await
924 .unwrap(),
925 Some(43)
926 );
927 }
928 });
929 }
930
931 #[test_traced]
932 fn test_indexing_across_resizes() {
933 let executor = deterministic::Runner::default();
935 executor.start(|context| async move {
936 let cfg = Config {
938 key_partition: "test_key_index".into(),
939 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
940 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
941 value_partition: "test_value_journal".into(),
942 value_compression: None,
943 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
944 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
945 table_partition: "test_table".into(),
946 table_initial_size: 2, table_resize_frequency: 2, table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
949 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
950 codec_config: (),
951 };
952 let mut freezer =
953 Freezer::<_, FixedBytes<64>, i32>::init(context.with_label("first"), cfg.clone())
954 .await
955 .expect("Failed to initialize freezer");
956
957 let mut keys = Vec::new();
960 for i in 0..1000 {
961 let key = test_key(&format!("key{i}"));
962 keys.push((key.clone(), i));
963
964 freezer.put(key, i).await.expect("Failed to put data");
966 freezer.sync().await.expect("Failed to sync");
967 }
968
969 for (key, value) in &keys {
971 let retrieved = freezer
972 .get(Identifier::Key(key))
973 .await
974 .expect("Failed to get data")
975 .expect("Data not found");
976 assert_eq!(retrieved, *value, "Value mismatch for key after resizes");
977 }
978
979 let checkpoint = freezer.close().await.expect("Failed to close");
981 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
982 context.with_label("second"),
983 cfg.clone(),
984 Some(checkpoint),
985 )
986 .await
987 .expect("Failed to reinitialize freezer");
988
989 for (key, value) in &keys {
991 let retrieved = freezer
992 .get(Identifier::Key(key))
993 .await
994 .expect("Failed to get data")
995 .expect("Data not found");
996 assert_eq!(retrieved, *value, "Value mismatch for key after restart");
997 }
998
999 let buffer = context.encode();
1001 assert!(buffer.contains("first_resizes_total 8"), "{}", buffer);
1002 });
1003 }
1004
1005 #[test_traced]
1006 fn test_insert_during_resize() {
1007 let executor = deterministic::Runner::default();
1008 executor.start(|context| async move {
1009 let cfg = Config {
1010 key_partition: "test_key_index".into(),
1011 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1012 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1013 value_partition: "test_value_journal".into(),
1014 value_compression: None,
1015 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1016 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1017 table_partition: "test_table".into(),
1018 table_initial_size: 2,
1019 table_resize_frequency: 1,
1020 table_resize_chunk_size: 1, table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1022 codec_config: (),
1023 };
1024 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
1025 .await
1026 .unwrap();
1027
1028 freezer.put(test_key("key0"), 0).await.unwrap();
1031 freezer.put(test_key("key2"), 1).await.unwrap();
1032 freezer.sync().await.unwrap(); assert!(freezer.resizing().is_some());
1036
1037 freezer.put(test_key("key6"), 2).await.unwrap();
1040 assert!(context.encode().contains("unnecessary_writes_total 1"));
1041 assert_eq!(freezer.resizable(), 3);
1042
1043 freezer.put(test_key("key3"), 3).await.unwrap();
1046 assert!(context.encode().contains("unnecessary_writes_total 1"));
1047 assert_eq!(freezer.resizable(), 3);
1048
1049 freezer.sync().await.unwrap();
1051 assert!(freezer.resizing().is_none());
1052 assert_eq!(freezer.resizable(), 2);
1053
1054 freezer.put(test_key("key4"), 4).await.unwrap();
1057 freezer.put(test_key("key7"), 5).await.unwrap();
1058 freezer.sync().await.unwrap();
1059
1060 assert!(freezer.resizing().is_some());
1062
1063 let keys = ["key0", "key2", "key6", "key3", "key4", "key7"];
1065 for (i, k) in keys.iter().enumerate() {
1066 assert_eq!(
1067 freezer.get(Identifier::Key(&test_key(k))).await.unwrap(),
1068 Some(i as i32)
1069 );
1070 }
1071
1072 while freezer.resizing().is_some() {
1074 freezer.sync().await.unwrap();
1075 }
1076
1077 assert_eq!(freezer.resizable(), 0);
1079 });
1080 }
1081
1082 #[test_traced]
1083 fn test_resize_after_startup() {
1084 let executor = deterministic::Runner::default();
1085 executor.start(|context| async move {
1086 let cfg = Config {
1087 key_partition: "test_key_index".into(),
1088 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1089 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1090 value_partition: "test_value_journal".into(),
1091 value_compression: None,
1092 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1093 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1094 table_partition: "test_table".into(),
1095 table_initial_size: 2,
1096 table_resize_frequency: 1,
1097 table_resize_chunk_size: 1, table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1099 codec_config: (),
1100 };
1101
1102 let checkpoint = {
1104 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
1105 context.with_label("first"),
1106 cfg.clone(),
1107 )
1108 .await
1109 .unwrap();
1110
1111 freezer.put(test_key("key0"), 0).await.unwrap();
1114 freezer.put(test_key("key2"), 1).await.unwrap();
1115 let checkpoint = freezer.sync().await.unwrap();
1116
1117 assert!(freezer.resizing().is_some());
1119
1120 checkpoint
1121 };
1122
1123 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
1125 context.with_label("second"),
1126 cfg.clone(),
1127 Some(checkpoint),
1128 )
1129 .await
1130 .unwrap();
1131 assert_eq!(freezer.resizable(), 1);
1132
1133 freezer.sync().await.unwrap();
1136 assert!(freezer.resizing().is_some());
1137
1138 while freezer.resizing().is_some() {
1140 freezer.sync().await.unwrap();
1141 }
1142
1143 assert_eq!(freezer.resizable(), 0);
1145 });
1146 }
1147
1148 fn test_operations_and_restart(num_keys: usize) -> String {
1149 let executor = deterministic::Runner::default();
1150 executor.start(|mut context| async move {
1151 let cfg = Config {
1152 key_partition: "test_key_index".into(),
1153 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1154 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1155 value_partition: "test_value_journal".into(),
1156 value_compression: None,
1157 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1158 value_target_size: 128, table_partition: "test_table".into(),
1160 table_initial_size: 8, table_resize_frequency: 2, table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1163 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1164 codec_config: (),
1165 };
1166 let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init(
1167 context.with_label("init1"),
1168 cfg.clone(),
1169 )
1170 .await
1171 .expect("Failed to initialize freezer");
1172
1173 let mut pairs = Vec::new();
1175
1176 for _ in 0..num_keys {
1177 let mut key = [0u8; 96];
1179 context.fill_bytes(&mut key);
1180 let key = FixedBytes::<96>::new(key);
1181
1182 let mut value = [0u8; 256];
1184 context.fill_bytes(&mut value);
1185 let value = FixedBytes::<256>::new(value);
1186
1187 freezer
1189 .put(key.clone(), value.clone())
1190 .await
1191 .expect("Failed to put data");
1192 pairs.push((key, value));
1193
1194 if context.gen_bool(0.1) {
1196 freezer.sync().await.expect("Failed to sync");
1197 }
1198 }
1199
1200 freezer.sync().await.expect("Failed to sync");
1202
1203 for (key, value) in &pairs {
1205 let retrieved = freezer
1206 .get(Identifier::Key(key))
1207 .await
1208 .expect("Failed to get data")
1209 .expect("Data not found");
1210 assert_eq!(&retrieved, value);
1211 }
1212
1213 for (key, _) in &pairs {
1215 assert!(freezer
1216 .get(Identifier::Key(key))
1217 .await
1218 .expect("Failed to check key")
1219 .is_some());
1220 }
1221
1222 for _ in 0..10 {
1224 let mut key = [0u8; 96];
1225 context.fill_bytes(&mut key);
1226 let key = FixedBytes::<96>::new(key);
1227 assert!(freezer
1228 .get(Identifier::Key(&key))
1229 .await
1230 .expect("Failed to check key")
1231 .is_none());
1232 }
1233
1234 let checkpoint = freezer.close().await.expect("Failed to close freezer");
1236
1237 let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init_with_checkpoint(
1239 context.with_label("init2"),
1240 cfg.clone(),
1241 Some(checkpoint),
1242 )
1243 .await
1244 .expect("Failed to initialize freezer");
1245
1246 for (key, value) in &pairs {
1248 let retrieved = freezer
1249 .get(Identifier::Key(key))
1250 .await
1251 .expect("Failed to get data")
1252 .expect("Data not found");
1253 assert_eq!(&retrieved, value);
1254 }
1255
1256 for _ in 0..20 {
1258 let mut key = [0u8; 96];
1259 context.fill_bytes(&mut key);
1260 let key = FixedBytes::<96>::new(key);
1261
1262 let mut value = [0u8; 256];
1263 context.fill_bytes(&mut value);
1264 let value = FixedBytes::<256>::new(value);
1265
1266 freezer.put(key, value).await.expect("Failed to put data");
1267 }
1268
1269 for _ in 0..3 {
1271 freezer.sync().await.expect("Failed to sync");
1272
1273 for _ in 0..5 {
1275 let mut key = [0u8; 96];
1276 context.fill_bytes(&mut key);
1277 let key = FixedBytes::<96>::new(key);
1278
1279 let mut value = [0u8; 256];
1280 context.fill_bytes(&mut value);
1281 let value = FixedBytes::<256>::new(value);
1282
1283 freezer.put(key, value).await.expect("Failed to put data");
1284 }
1285 }
1286
1287 freezer.sync().await.expect("Failed to sync");
1289
1290 context.auditor().state()
1292 })
1293 }
1294
1295 #[test_group("slow")]
1296 #[test_traced]
1297 fn test_determinism() {
1298 let state1 = test_operations_and_restart(1_000);
1299 let state2 = test_operations_and_restart(1_000);
1300 assert_eq!(state1, state2);
1301 }
1302
1303 #[test_traced]
1304 fn test_put_multiple_updates() {
1305 let executor = deterministic::Runner::default();
1307 executor.start(|context| async move {
1308 let cfg = Config {
1310 key_partition: "test_key_index".into(),
1311 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1312 key_page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1313 value_partition: "test_value_journal".into(),
1314 value_compression: None,
1315 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1316 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1317 table_partition: "test_table".into(),
1318 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
1319 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
1320 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1321 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1322 codec_config: (),
1323 };
1324 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
1325 .await
1326 .expect("Failed to initialize freezer");
1327
1328 let key = test_key("key1");
1329
1330 freezer
1331 .put(key.clone(), 1)
1332 .await
1333 .expect("Failed to put data");
1334 freezer
1335 .put(key.clone(), 2)
1336 .await
1337 .expect("Failed to put data");
1338 freezer.sync().await.expect("Failed to sync");
1339 assert_eq!(
1340 freezer
1341 .get(Identifier::Key(&key))
1342 .await
1343 .expect("Failed to get data")
1344 .unwrap(),
1345 2
1346 );
1347 });
1348 }
1349}