1#[cfg(test)]
206mod conformance;
207mod storage;
208use commonware_runtime::buffer::paged::CacheRef;
209use commonware_utils::Array;
210use std::num::NonZeroUsize;
211pub use storage::{Checkpoint, Cursor, Freezer};
212use thiserror::Error;
213
214pub enum Identifier<'a, K: Array> {
216 Cursor(Cursor),
217 Key(&'a K),
218}
219
220#[derive(Debug, Error)]
222pub enum Error {
223 #[error("runtime error: {0}")]
224 Runtime(#[from] commonware_runtime::Error),
225 #[error("journal error: {0}")]
226 Journal(#[from] crate::journal::Error),
227 #[error("codec error: {0}")]
228 Codec(#[from] commonware_codec::Error),
229}
230
231#[derive(Clone)]
233pub struct Config<C> {
234 pub key_partition: String,
236
237 pub key_write_buffer: NonZeroUsize,
239
240 pub key_page_cache: CacheRef,
242
243 pub value_partition: String,
245
246 pub value_compression: Option<u8>,
248
249 pub value_write_buffer: NonZeroUsize,
251
252 pub value_target_size: u64,
254
255 pub table_partition: String,
257
258 pub table_initial_size: u32,
260
261 pub table_resize_frequency: u8,
264
265 pub table_resize_chunk_size: u32,
267
268 pub table_replay_buffer: NonZeroUsize,
270
271 pub codec_config: C,
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use commonware_codec::DecodeExt;
279 use commonware_macros::{test_group, test_traced};
280 use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
281 use commonware_utils::{hex, sequence::FixedBytes, NZUsize, NZU16};
282 use rand::{Rng, RngCore};
283 use std::num::NonZeroU16;
284
285 fn test_key(key: &str) -> FixedBytes<64> {
286 let mut buf = [0u8; 64];
287 let key = key.as_bytes();
288 assert!(key.len() <= buf.len());
289 buf[..key.len()].copy_from_slice(key);
290 FixedBytes::decode(buf.as_ref()).unwrap()
291 }
292
293 const DEFAULT_WRITE_BUFFER: usize = 1024;
294 const DEFAULT_VALUE_TARGET_SIZE: u64 = 10 * 1024 * 1024;
295 const DEFAULT_TABLE_INITIAL_SIZE: u32 = 256;
296 const DEFAULT_TABLE_RESIZE_FREQUENCY: u8 = 4;
297 const DEFAULT_TABLE_RESIZE_CHUNK_SIZE: u32 = 128; const DEFAULT_TABLE_REPLAY_BUFFER: usize = 64 * 1024; const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
300 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
301
302 fn test_put_get(compression: Option<u8>) {
303 let executor = deterministic::Runner::default();
305 executor.start(|context| async move {
306 let cfg = Config {
308 key_partition: "test-key-index".into(),
309 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
310 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
311 value_partition: "test-value-journal".into(),
312 value_compression: compression,
313 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
314 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
315 table_partition: "test-table".into(),
316 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
317 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
318 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
319 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
320 codec_config: (),
321 };
322 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
323 .await
324 .expect("Failed to initialize freezer");
325
326 let key = test_key("testkey");
327 let data = 42;
328
329 let value = freezer
331 .get(Identifier::Key(&key))
332 .await
333 .expect("Failed to check key");
334 assert!(value.is_none());
335
336 freezer
338 .put(key.clone(), data)
339 .await
340 .expect("Failed to put data");
341
342 let value = freezer
344 .get(Identifier::Key(&key))
345 .await
346 .expect("Failed to get data")
347 .expect("Data not found");
348 assert_eq!(value, data);
349
350 let buffer = context.encode();
352 assert!(buffer.contains("gets_total 2"), "{}", buffer);
353 assert!(buffer.contains("puts_total 1"), "{}", buffer);
354 assert!(buffer.contains("unnecessary_reads_total 0"), "{}", buffer);
355
356 freezer.sync().await.expect("Failed to sync data");
358 });
359 }
360
361 #[test_traced]
362 fn test_put_get_no_compression() {
363 test_put_get(None);
364 }
365
366 #[test_traced]
367 fn test_put_get_compression() {
368 test_put_get(Some(3));
369 }
370
371 #[test_traced]
372 fn test_multiple_keys() {
373 let executor = deterministic::Runner::default();
375 executor.start(|context| async move {
376 let cfg = Config {
378 key_partition: "test-key-index".into(),
379 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
380 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
381 value_partition: "test-value-journal".into(),
382 value_compression: None,
383 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
384 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
385 table_partition: "test-table".into(),
386 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
387 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
388 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
389 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
390 codec_config: (),
391 };
392 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
393 .await
394 .expect("Failed to initialize freezer");
395
396 let keys = vec![
398 (test_key("key1"), 1),
399 (test_key("key2"), 2),
400 (test_key("key3"), 3),
401 (test_key("key4"), 4),
402 (test_key("key5"), 5),
403 ];
404
405 for (key, data) in &keys {
406 freezer
407 .put(key.clone(), *data)
408 .await
409 .expect("Failed to put data");
410 }
411
412 for (key, data) in &keys {
414 let retrieved = freezer
415 .get(Identifier::Key(key))
416 .await
417 .expect("Failed to get data")
418 .expect("Data not found");
419 assert_eq!(retrieved, *data);
420 }
421 });
422 }
423
424 #[test_traced]
425 fn test_collision_handling() {
426 let executor = deterministic::Runner::default();
428 executor.start(|context| async move {
429 let cfg = Config {
431 key_partition: "test-key-index".into(),
432 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
433 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
434 value_partition: "test-value-journal".into(),
435 value_compression: None,
436 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
437 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
438 table_partition: "test-table".into(),
439 table_initial_size: 4, table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
441 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
442 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
443 codec_config: (),
444 };
445 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
446 .await
447 .expect("Failed to initialize freezer");
448
449 let keys = vec![
451 (test_key("key1"), 1),
452 (test_key("key2"), 2),
453 (test_key("key3"), 3),
454 (test_key("key4"), 4),
455 (test_key("key5"), 5),
456 (test_key("key6"), 6),
457 (test_key("key7"), 7),
458 (test_key("key8"), 8),
459 ];
460
461 for (key, data) in &keys {
462 freezer
463 .put(key.clone(), *data)
464 .await
465 .expect("Failed to put data");
466 }
467
468 freezer.sync().await.expect("Failed to sync");
470
471 for (key, data) in &keys {
473 let retrieved = freezer
474 .get(Identifier::Key(key))
475 .await
476 .expect("Failed to get data")
477 .expect("Data not found");
478 assert_eq!(retrieved, *data);
479 }
480
481 let buffer = context.encode();
483 assert!(buffer.contains("gets_total 8"), "{}", buffer);
484 assert!(buffer.contains("unnecessary_reads_total 5"), "{}", buffer);
485 });
486 }
487
488 #[test_traced]
489 fn test_restart() {
490 let executor = deterministic::Runner::default();
492 executor.start(|context| async move {
493 let cfg = Config {
494 key_partition: "test-key-index".into(),
495 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
496 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
497 value_partition: "test-value-journal".into(),
498 value_compression: None,
499 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
500 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
501 table_partition: "test-table".into(),
502 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
503 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
504 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
505 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
506 codec_config: (),
507 };
508
509 let checkpoint = {
511 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
512 context.with_label("first"),
513 cfg.clone(),
514 )
515 .await
516 .expect("Failed to initialize freezer");
517
518 let keys = vec![
519 (test_key("persist1"), 100),
520 (test_key("persist2"), 200),
521 (test_key("persist3"), 300),
522 ];
523
524 for (key, data) in &keys {
525 freezer
526 .put(key.clone(), *data)
527 .await
528 .expect("Failed to put data");
529 }
530
531 freezer.close().await.expect("Failed to close freezer")
532 };
533
534 {
536 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
537 context.with_label("second"),
538 cfg.clone(),
539 Some(checkpoint),
540 )
541 .await
542 .expect("Failed to initialize freezer");
543
544 let keys = vec![
545 (test_key("persist1"), 100),
546 (test_key("persist2"), 200),
547 (test_key("persist3"), 300),
548 ];
549
550 for (key, data) in &keys {
551 let retrieved = freezer
552 .get(Identifier::Key(key))
553 .await
554 .expect("Failed to get data")
555 .expect("Data not found");
556 assert_eq!(retrieved, *data);
557 }
558 }
559 });
560 }
561
562 #[test_traced]
563 fn test_crash_consistency() {
564 let executor = deterministic::Runner::default();
566 executor.start(|context| async move {
567 let cfg = Config {
568 key_partition: "test-key-index".into(),
569 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
570 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
571 value_partition: "test-value-journal".into(),
572 value_compression: None,
573 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
574 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
575 table_partition: "test-table".into(),
576 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
577 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
578 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
579 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
580 codec_config: (),
581 };
582
583 let checkpoint = {
585 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
586 context.with_label("first"),
587 cfg.clone(),
588 )
589 .await
590 .expect("Failed to initialize freezer");
591
592 freezer
593 .put(test_key("committed1"), 1)
594 .await
595 .expect("Failed to put data");
596 freezer
597 .put(test_key("committed2"), 2)
598 .await
599 .expect("Failed to put data");
600
601 freezer.sync().await.expect("Failed to sync");
603
604 freezer
606 .put(test_key("uncommitted1"), 3)
607 .await
608 .expect("Failed to put data");
609 freezer
610 .put(test_key("uncommitted2"), 4)
611 .await
612 .expect("Failed to put data");
613
614 freezer.close().await.expect("Failed to close")
616 };
617
618 {
620 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
621 context.with_label("second"),
622 cfg.clone(),
623 Some(checkpoint),
624 )
625 .await
626 .expect("Failed to initialize freezer");
627
628 assert_eq!(
630 freezer
631 .get(Identifier::Key(&test_key("committed1")))
632 .await
633 .unwrap(),
634 Some(1)
635 );
636 assert_eq!(
637 freezer
638 .get(Identifier::Key(&test_key("committed2")))
639 .await
640 .unwrap(),
641 Some(2)
642 );
643
644 if let Some(val) = freezer
647 .get(Identifier::Key(&test_key("uncommitted1")))
648 .await
649 .unwrap()
650 {
651 assert_eq!(val, 3);
652 }
653 if let Some(val) = freezer
654 .get(Identifier::Key(&test_key("uncommitted2")))
655 .await
656 .unwrap()
657 {
658 assert_eq!(val, 4);
659 }
660 }
661 });
662 }
663
664 #[test_traced]
665 fn test_destroy() {
666 let executor = deterministic::Runner::default();
668 executor.start(|context| async move {
669 let cfg = Config {
671 key_partition: "test-key-index".into(),
672 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
673 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
674 value_partition: "test-value-journal".into(),
675 value_compression: None,
676 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
677 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
678 table_partition: "test-table".into(),
679 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
680 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
681 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
682 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
683 codec_config: (),
684 };
685 {
686 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
687 context.with_label("first"),
688 cfg.clone(),
689 )
690 .await
691 .expect("Failed to initialize freezer");
692
693 freezer
694 .put(test_key("destroy1"), 1)
695 .await
696 .expect("Failed to put data");
697 freezer
698 .put(test_key("destroy2"), 2)
699 .await
700 .expect("Failed to put data");
701
702 freezer.destroy().await.expect("Failed to destroy freezer");
704 }
705
706 {
708 let freezer = Freezer::<_, FixedBytes<64>, i32>::init(
709 context.with_label("second"),
710 cfg.clone(),
711 )
712 .await
713 .expect("Failed to initialize freezer");
714
715 assert!(freezer
717 .get(Identifier::Key(&test_key("destroy1")))
718 .await
719 .unwrap()
720 .is_none());
721 assert!(freezer
722 .get(Identifier::Key(&test_key("destroy2")))
723 .await
724 .unwrap()
725 .is_none());
726 }
727 });
728 }
729
730 #[test_traced]
731 fn test_partial_table_entry_write() {
732 let executor = deterministic::Runner::default();
734 executor.start(|context| async move {
735 let cfg = Config {
737 key_partition: "test-key-index".into(),
738 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
739 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
740 value_partition: "test-value-journal".into(),
741 value_compression: None,
742 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
743 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
744 table_partition: "test-table".into(),
745 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
746 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
747 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
748 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
749 codec_config: (),
750 };
751 let checkpoint = {
752 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
753 context.with_label("first"),
754 cfg.clone(),
755 )
756 .await
757 .expect("Failed to initialize freezer");
758
759 freezer.put(test_key("key1"), 42).await.unwrap();
760 freezer.sync().await.unwrap();
761 freezer.close().await.unwrap()
762 };
763
764 {
766 let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
767 blob.write_at(0, vec![0xFF; 10]).await.unwrap();
769 blob.sync().await.unwrap();
770 }
771
772 {
774 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
775 context.with_label("second"),
776 cfg.clone(),
777 Some(checkpoint),
778 )
779 .await
780 .expect("Failed to initialize freezer");
781
782 let result = freezer
785 .get(Identifier::Key(&test_key("key1")))
786 .await
787 .unwrap();
788 assert!(result.is_none() || result == Some(42));
789 }
790 });
791 }
792
793 #[test_traced]
794 fn test_table_entry_invalid_crc() {
795 let executor = deterministic::Runner::default();
797 executor.start(|context| async move {
798 let cfg = Config {
799 key_partition: "test-key-index".into(),
800 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
801 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
802 value_partition: "test-value-journal".into(),
803 value_compression: None,
804 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
805 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
806 table_partition: "test-table".into(),
807 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
808 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
809 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
810 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
811 codec_config: (),
812 };
813
814 let checkpoint = {
816 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
817 context.with_label("first"),
818 cfg.clone(),
819 )
820 .await
821 .expect("Failed to initialize freezer");
822
823 freezer.put(test_key("key1"), 42).await.unwrap();
824 freezer.sync().await.unwrap();
825 freezer.close().await.unwrap()
826 };
827
828 {
830 let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
831 let entry_data = blob.read_at(0, 24).await.unwrap();
833 let mut corrupted = entry_data.coalesce();
834 corrupted.as_mut()[20] ^= 0xFF;
836 blob.write_at(0, corrupted).await.unwrap();
837 blob.sync().await.unwrap();
838 }
839
840 {
842 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
843 context.with_label("second"),
844 cfg.clone(),
845 Some(checkpoint),
846 )
847 .await
848 .expect("Failed to initialize freezer");
849
850 let result = freezer
852 .get(Identifier::Key(&test_key("key1")))
853 .await
854 .unwrap();
855 assert!(result.is_none() || result == Some(42));
857 }
858 });
859 }
860
861 #[test_traced]
862 fn test_table_extra_bytes() {
863 let executor = deterministic::Runner::default();
865 executor.start(|context| async move {
866 let cfg = Config {
867 key_partition: "test-key-index".into(),
868 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
869 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
870 value_partition: "test-value-journal".into(),
871 value_compression: None,
872 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
873 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
874 table_partition: "test-table".into(),
875 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
876 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
877 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
878 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
879 codec_config: (),
880 };
881
882 let checkpoint = {
884 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
885 context.with_label("first"),
886 cfg.clone(),
887 )
888 .await
889 .expect("Failed to initialize freezer");
890
891 freezer.put(test_key("key1"), 42).await.unwrap();
892 freezer.sync().await.unwrap();
893 freezer.close().await.unwrap()
894 };
895
896 {
898 let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
899 blob.write_at(size, hex!("0xdeadbeef").to_vec())
901 .await
902 .unwrap();
903 blob.sync().await.unwrap();
904 }
905
906 {
908 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
909 context.with_label("second"),
910 cfg.clone(),
911 Some(checkpoint),
912 )
913 .await
914 .expect("Failed to initialize freezer");
915
916 assert_eq!(
918 freezer
919 .get(Identifier::Key(&test_key("key1")))
920 .await
921 .unwrap(),
922 Some(42)
923 );
924
925 let mut freezer_mut = freezer;
927 freezer_mut.put(test_key("key2"), 43).await.unwrap();
928 assert_eq!(
929 freezer_mut
930 .get(Identifier::Key(&test_key("key2")))
931 .await
932 .unwrap(),
933 Some(43)
934 );
935 }
936 });
937 }
938
939 #[test_traced]
940 fn test_indexing_across_resizes() {
941 let executor = deterministic::Runner::default();
943 executor.start(|context| async move {
944 let cfg = Config {
946 key_partition: "test-key-index".into(),
947 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
948 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
949 value_partition: "test-value-journal".into(),
950 value_compression: None,
951 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
952 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
953 table_partition: "test-table".into(),
954 table_initial_size: 2, table_resize_frequency: 2, table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
957 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
958 codec_config: (),
959 };
960 let mut freezer =
961 Freezer::<_, FixedBytes<64>, i32>::init(context.with_label("first"), cfg.clone())
962 .await
963 .expect("Failed to initialize freezer");
964
965 let mut keys = Vec::new();
968 for i in 0..1000 {
969 let key = test_key(&format!("key{i}"));
970 keys.push((key.clone(), i));
971
972 freezer.put(key, i).await.expect("Failed to put data");
974 freezer.sync().await.expect("Failed to sync");
975 }
976
977 for (key, value) in &keys {
979 let retrieved = freezer
980 .get(Identifier::Key(key))
981 .await
982 .expect("Failed to get data")
983 .expect("Data not found");
984 assert_eq!(retrieved, *value, "Value mismatch for key after resizes");
985 }
986
987 let checkpoint = freezer.close().await.expect("Failed to close");
989 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
990 context.with_label("second"),
991 cfg.clone(),
992 Some(checkpoint),
993 )
994 .await
995 .expect("Failed to reinitialize freezer");
996
997 for (key, value) in &keys {
999 let retrieved = freezer
1000 .get(Identifier::Key(key))
1001 .await
1002 .expect("Failed to get data")
1003 .expect("Data not found");
1004 assert_eq!(retrieved, *value, "Value mismatch for key after restart");
1005 }
1006
1007 let buffer = context.encode();
1009 assert!(buffer.contains("first_resizes_total 8"), "{}", buffer);
1010 });
1011 }
1012
1013 #[test_traced]
1014 fn test_insert_during_resize() {
1015 let executor = deterministic::Runner::default();
1016 executor.start(|context| async move {
1017 let cfg = Config {
1018 key_partition: "test-key-index".into(),
1019 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1020 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1021 value_partition: "test-value-journal".into(),
1022 value_compression: None,
1023 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1024 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1025 table_partition: "test-table".into(),
1026 table_initial_size: 2,
1027 table_resize_frequency: 1,
1028 table_resize_chunk_size: 1, table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1030 codec_config: (),
1031 };
1032 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
1033 .await
1034 .unwrap();
1035
1036 freezer.put(test_key("key0"), 0).await.unwrap();
1039 freezer.put(test_key("key2"), 1).await.unwrap();
1040 freezer.sync().await.unwrap(); assert!(freezer.resizing().is_some());
1044
1045 freezer.put(test_key("key6"), 2).await.unwrap();
1048 assert!(context.encode().contains("unnecessary_writes_total 1"));
1049 assert_eq!(freezer.resizable(), 3);
1050
1051 freezer.put(test_key("key3"), 3).await.unwrap();
1054 assert!(context.encode().contains("unnecessary_writes_total 1"));
1055 assert_eq!(freezer.resizable(), 3);
1056
1057 freezer.sync().await.unwrap();
1059 assert!(freezer.resizing().is_none());
1060 assert_eq!(freezer.resizable(), 2);
1061
1062 freezer.put(test_key("key4"), 4).await.unwrap();
1065 freezer.put(test_key("key7"), 5).await.unwrap();
1066 freezer.sync().await.unwrap();
1067
1068 assert!(freezer.resizing().is_some());
1070
1071 let keys = ["key0", "key2", "key6", "key3", "key4", "key7"];
1073 for (i, k) in keys.iter().enumerate() {
1074 assert_eq!(
1075 freezer.get(Identifier::Key(&test_key(k))).await.unwrap(),
1076 Some(i as i32)
1077 );
1078 }
1079
1080 while freezer.resizing().is_some() {
1082 freezer.sync().await.unwrap();
1083 }
1084
1085 assert_eq!(freezer.resizable(), 0);
1087 });
1088 }
1089
1090 #[test_traced]
1091 fn test_resize_after_startup() {
1092 let executor = deterministic::Runner::default();
1093 executor.start(|context| async move {
1094 let cfg = Config {
1095 key_partition: "test-key-index".into(),
1096 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1097 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1098 value_partition: "test-value-journal".into(),
1099 value_compression: None,
1100 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1101 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1102 table_partition: "test-table".into(),
1103 table_initial_size: 2,
1104 table_resize_frequency: 1,
1105 table_resize_chunk_size: 1, table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1107 codec_config: (),
1108 };
1109
1110 let checkpoint = {
1112 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(
1113 context.with_label("first"),
1114 cfg.clone(),
1115 )
1116 .await
1117 .unwrap();
1118
1119 freezer.put(test_key("key0"), 0).await.unwrap();
1122 freezer.put(test_key("key2"), 1).await.unwrap();
1123 let checkpoint = freezer.sync().await.unwrap();
1124
1125 assert!(freezer.resizing().is_some());
1127
1128 checkpoint
1129 };
1130
1131 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
1133 context.with_label("second"),
1134 cfg.clone(),
1135 Some(checkpoint),
1136 )
1137 .await
1138 .unwrap();
1139 assert_eq!(freezer.resizable(), 1);
1140
1141 freezer.sync().await.unwrap();
1144 assert!(freezer.resizing().is_some());
1145
1146 while freezer.resizing().is_some() {
1148 freezer.sync().await.unwrap();
1149 }
1150
1151 assert_eq!(freezer.resizable(), 0);
1153 });
1154 }
1155
1156 fn test_operations_and_restart(num_keys: usize) -> String {
1157 let executor = deterministic::Runner::default();
1158 executor.start(|mut context| async move {
1159 let cfg = Config {
1160 key_partition: "test-key-index".into(),
1161 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1162 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1163 value_partition: "test-value-journal".into(),
1164 value_compression: None,
1165 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1166 value_target_size: 128, table_partition: "test-table".into(),
1168 table_initial_size: 8, table_resize_frequency: 2, table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1171 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1172 codec_config: (),
1173 };
1174 let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init(
1175 context.with_label("init1"),
1176 cfg.clone(),
1177 )
1178 .await
1179 .expect("Failed to initialize freezer");
1180
1181 let mut pairs = Vec::new();
1183
1184 for _ in 0..num_keys {
1185 let mut key = [0u8; 96];
1187 context.fill_bytes(&mut key);
1188 let key = FixedBytes::<96>::new(key);
1189
1190 let mut value = [0u8; 256];
1192 context.fill_bytes(&mut value);
1193 let value = FixedBytes::<256>::new(value);
1194
1195 freezer
1197 .put(key.clone(), value.clone())
1198 .await
1199 .expect("Failed to put data");
1200 pairs.push((key, value));
1201
1202 if context.gen_bool(0.1) {
1204 freezer.sync().await.expect("Failed to sync");
1205 }
1206 }
1207
1208 freezer.sync().await.expect("Failed to sync");
1210
1211 for (key, value) in &pairs {
1213 let retrieved = freezer
1214 .get(Identifier::Key(key))
1215 .await
1216 .expect("Failed to get data")
1217 .expect("Data not found");
1218 assert_eq!(&retrieved, value);
1219 }
1220
1221 for (key, _) in &pairs {
1223 assert!(freezer
1224 .get(Identifier::Key(key))
1225 .await
1226 .expect("Failed to check key")
1227 .is_some());
1228 }
1229
1230 for _ in 0..10 {
1232 let mut key = [0u8; 96];
1233 context.fill_bytes(&mut key);
1234 let key = FixedBytes::<96>::new(key);
1235 assert!(freezer
1236 .get(Identifier::Key(&key))
1237 .await
1238 .expect("Failed to check key")
1239 .is_none());
1240 }
1241
1242 let checkpoint = freezer.close().await.expect("Failed to close freezer");
1244
1245 let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init_with_checkpoint(
1247 context.with_label("init2"),
1248 cfg.clone(),
1249 Some(checkpoint),
1250 )
1251 .await
1252 .expect("Failed to initialize freezer");
1253
1254 for (key, value) in &pairs {
1256 let retrieved = freezer
1257 .get(Identifier::Key(key))
1258 .await
1259 .expect("Failed to get data")
1260 .expect("Data not found");
1261 assert_eq!(&retrieved, value);
1262 }
1263
1264 for _ in 0..20 {
1266 let mut key = [0u8; 96];
1267 context.fill_bytes(&mut key);
1268 let key = FixedBytes::<96>::new(key);
1269
1270 let mut value = [0u8; 256];
1271 context.fill_bytes(&mut value);
1272 let value = FixedBytes::<256>::new(value);
1273
1274 freezer.put(key, value).await.expect("Failed to put data");
1275 }
1276
1277 for _ in 0..3 {
1279 freezer.sync().await.expect("Failed to sync");
1280
1281 for _ in 0..5 {
1283 let mut key = [0u8; 96];
1284 context.fill_bytes(&mut key);
1285 let key = FixedBytes::<96>::new(key);
1286
1287 let mut value = [0u8; 256];
1288 context.fill_bytes(&mut value);
1289 let value = FixedBytes::<256>::new(value);
1290
1291 freezer.put(key, value).await.expect("Failed to put data");
1292 }
1293 }
1294
1295 freezer.sync().await.expect("Failed to sync");
1297
1298 context.auditor().state()
1300 })
1301 }
1302
1303 #[test_group("slow")]
1304 #[test_traced]
1305 fn test_determinism() {
1306 let state1 = test_operations_and_restart(1_000);
1307 let state2 = test_operations_and_restart(1_000);
1308 assert_eq!(state1, state2);
1309 }
1310
1311 #[test_traced]
1312 fn test_put_multiple_updates() {
1313 let executor = deterministic::Runner::default();
1315 executor.start(|context| async move {
1316 let cfg = Config {
1318 key_partition: "test-key-index".into(),
1319 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1320 key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1321 value_partition: "test-value-journal".into(),
1322 value_compression: None,
1323 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1324 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1325 table_partition: "test-table".into(),
1326 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
1327 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
1328 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1329 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1330 codec_config: (),
1331 };
1332 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
1333 .await
1334 .expect("Failed to initialize freezer");
1335
1336 let key = test_key("key1");
1337
1338 freezer
1339 .put(key.clone(), 1)
1340 .await
1341 .expect("Failed to put data");
1342 freezer
1343 .put(key.clone(), 2)
1344 .await
1345 .expect("Failed to put data");
1346 freezer.sync().await.expect("Failed to sync");
1347 assert_eq!(
1348 freezer
1349 .get(Identifier::Key(&key))
1350 .await
1351 .expect("Failed to get data")
1352 .unwrap(),
1353 2
1354 );
1355 });
1356 }
1357}