1mod storage;
206use commonware_runtime::buffer::PoolRef;
207use commonware_utils::Array;
208use std::num::NonZeroUsize;
209pub use storage::{Checkpoint, Cursor, Freezer};
210use thiserror::Error;
211
212pub enum Identifier<'a, K: Array> {
214 Cursor(Cursor),
215 Key(&'a K),
216}
217
218#[derive(Debug, Error)]
220pub enum Error {
221 #[error("runtime error: {0}")]
222 Runtime(#[from] commonware_runtime::Error),
223 #[error("journal error: {0}")]
224 Journal(#[from] crate::journal::Error),
225 #[error("codec error: {0}")]
226 Codec(#[from] commonware_codec::Error),
227}
228
229#[derive(Clone)]
231pub struct Config<C> {
232 pub key_partition: String,
234
235 pub key_write_buffer: NonZeroUsize,
237
238 pub key_buffer_pool: PoolRef,
240
241 pub value_partition: String,
243
244 pub value_compression: Option<u8>,
246
247 pub value_write_buffer: NonZeroUsize,
249
250 pub value_target_size: u64,
252
253 pub table_partition: String,
255
256 pub table_initial_size: u32,
258
259 pub table_resize_frequency: u8,
262
263 pub table_resize_chunk_size: u32,
265
266 pub table_replay_buffer: NonZeroUsize,
268
269 pub codec_config: C,
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use commonware_codec::DecodeExt;
277 use commonware_macros::{test_group, test_traced};
278 use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
279 use commonware_utils::{hex, sequence::FixedBytes, NZUsize, NZU16};
280 use rand::{Rng, RngCore};
281 use std::num::NonZeroU16;
282
283 const DEFAULT_WRITE_BUFFER: usize = 1024;
284 const DEFAULT_VALUE_TARGET_SIZE: u64 = 10 * 1024 * 1024;
285 const DEFAULT_TABLE_INITIAL_SIZE: u32 = 256;
286 const DEFAULT_TABLE_RESIZE_FREQUENCY: u8 = 4;
287 const DEFAULT_TABLE_RESIZE_CHUNK_SIZE: u32 = 128; const DEFAULT_TABLE_REPLAY_BUFFER: usize = 64 * 1024; const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
290 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
291
292 fn test_key(key: &str) -> FixedBytes<64> {
293 let mut buf = [0u8; 64];
294 let key = key.as_bytes();
295 assert!(key.len() <= buf.len());
296 buf[..key.len()].copy_from_slice(key);
297 FixedBytes::decode(buf.as_ref()).unwrap()
298 }
299
300 fn test_put_get(compression: Option<u8>) {
301 let executor = deterministic::Runner::default();
303 executor.start(|context| async move {
304 let cfg = Config {
306 key_partition: "test_key_index".into(),
307 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
308 key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
309 value_partition: "test_value_journal".into(),
310 value_compression: compression,
311 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
312 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
313 table_partition: "test_table".into(),
314 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
315 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
316 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
317 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
318 codec_config: (),
319 };
320 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
321 .await
322 .expect("Failed to initialize freezer");
323
324 let key = test_key("testkey");
325 let data = 42;
326
327 let value = freezer
329 .get(Identifier::Key(&key))
330 .await
331 .expect("Failed to check key");
332 assert!(value.is_none());
333
334 freezer
336 .put(key.clone(), data)
337 .await
338 .expect("Failed to put data");
339
340 let value = freezer
342 .get(Identifier::Key(&key))
343 .await
344 .expect("Failed to get data")
345 .expect("Data not found");
346 assert_eq!(value, data);
347
348 let buffer = context.encode();
350 assert!(buffer.contains("gets_total 2"), "{}", buffer);
351 assert!(buffer.contains("puts_total 1"), "{}", buffer);
352 assert!(buffer.contains("unnecessary_reads_total 0"), "{}", buffer);
353
354 freezer.sync().await.expect("Failed to sync data");
356 });
357 }
358
359 #[test_traced]
360 fn test_put_get_no_compression() {
361 test_put_get(None);
362 }
363
364 #[test_traced]
365 fn test_put_get_compression() {
366 test_put_get(Some(3));
367 }
368
369 #[test_traced]
370 fn test_multiple_keys() {
371 let executor = deterministic::Runner::default();
373 executor.start(|context| async move {
374 let cfg = Config {
376 key_partition: "test_key_index".into(),
377 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
378 key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
379 value_partition: "test_value_journal".into(),
380 value_compression: None,
381 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
382 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
383 table_partition: "test_table".into(),
384 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
385 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
386 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
387 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
388 codec_config: (),
389 };
390 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
391 .await
392 .expect("Failed to initialize freezer");
393
394 let keys = vec![
396 (test_key("key1"), 1),
397 (test_key("key2"), 2),
398 (test_key("key3"), 3),
399 (test_key("key4"), 4),
400 (test_key("key5"), 5),
401 ];
402
403 for (key, data) in &keys {
404 freezer
405 .put(key.clone(), *data)
406 .await
407 .expect("Failed to put data");
408 }
409
410 for (key, data) in &keys {
412 let retrieved = freezer
413 .get(Identifier::Key(key))
414 .await
415 .expect("Failed to get data")
416 .expect("Data not found");
417 assert_eq!(retrieved, *data);
418 }
419 });
420 }
421
422 #[test_traced]
423 fn test_collision_handling() {
424 let executor = deterministic::Runner::default();
426 executor.start(|context| async move {
427 let cfg = Config {
429 key_partition: "test_key_index".into(),
430 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
431 key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
432 value_partition: "test_value_journal".into(),
433 value_compression: None,
434 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
435 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
436 table_partition: "test_table".into(),
437 table_initial_size: 4, table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
439 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
440 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
441 codec_config: (),
442 };
443 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
444 .await
445 .expect("Failed to initialize freezer");
446
447 let keys = vec![
449 (test_key("key1"), 1),
450 (test_key("key2"), 2),
451 (test_key("key3"), 3),
452 (test_key("key4"), 4),
453 (test_key("key5"), 5),
454 (test_key("key6"), 6),
455 (test_key("key7"), 7),
456 (test_key("key8"), 8),
457 ];
458
459 for (key, data) in &keys {
460 freezer
461 .put(key.clone(), *data)
462 .await
463 .expect("Failed to put data");
464 }
465
466 freezer.sync().await.expect("Failed to sync");
468
469 for (key, data) in &keys {
471 let retrieved = freezer
472 .get(Identifier::Key(key))
473 .await
474 .expect("Failed to get data")
475 .expect("Data not found");
476 assert_eq!(retrieved, *data);
477 }
478
479 let buffer = context.encode();
481 assert!(buffer.contains("gets_total 8"), "{}", buffer);
482 assert!(buffer.contains("unnecessary_reads_total 5"), "{}", buffer);
483 });
484 }
485
486 #[test_traced]
487 fn test_restart() {
488 let executor = deterministic::Runner::default();
490 executor.start(|context| async move {
491 let cfg = Config {
492 key_partition: "test_key_index".into(),
493 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
494 key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
495 value_partition: "test_value_journal".into(),
496 value_compression: None,
497 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
498 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
499 table_partition: "test_table".into(),
500 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
501 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
502 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
503 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
504 codec_config: (),
505 };
506
507 let checkpoint = {
509 let mut freezer =
510 Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
511 .await
512 .expect("Failed to initialize freezer");
513
514 let keys = vec![
515 (test_key("persist1"), 100),
516 (test_key("persist2"), 200),
517 (test_key("persist3"), 300),
518 ];
519
520 for (key, data) in &keys {
521 freezer
522 .put(key.clone(), *data)
523 .await
524 .expect("Failed to put data");
525 }
526
527 freezer.close().await.expect("Failed to close freezer")
528 };
529
530 {
532 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
533 context.clone(),
534 cfg.clone(),
535 Some(checkpoint),
536 )
537 .await
538 .expect("Failed to initialize freezer");
539
540 let keys = vec![
541 (test_key("persist1"), 100),
542 (test_key("persist2"), 200),
543 (test_key("persist3"), 300),
544 ];
545
546 for (key, data) in &keys {
547 let retrieved = freezer
548 .get(Identifier::Key(key))
549 .await
550 .expect("Failed to get data")
551 .expect("Data not found");
552 assert_eq!(retrieved, *data);
553 }
554 }
555 });
556 }
557
558 #[test_traced]
559 fn test_crash_consistency() {
560 let executor = deterministic::Runner::default();
562 executor.start(|context| async move {
563 let cfg = Config {
564 key_partition: "test_key_index".into(),
565 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
566 key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
567 value_partition: "test_value_journal".into(),
568 value_compression: None,
569 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
570 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
571 table_partition: "test_table".into(),
572 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
573 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
574 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
575 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
576 codec_config: (),
577 };
578
579 let checkpoint = {
581 let mut freezer =
582 Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
583 .await
584 .expect("Failed to initialize freezer");
585
586 freezer
587 .put(test_key("committed1"), 1)
588 .await
589 .expect("Failed to put data");
590 freezer
591 .put(test_key("committed2"), 2)
592 .await
593 .expect("Failed to put data");
594
595 freezer.sync().await.expect("Failed to sync");
597
598 freezer
600 .put(test_key("uncommitted1"), 3)
601 .await
602 .expect("Failed to put data");
603 freezer
604 .put(test_key("uncommitted2"), 4)
605 .await
606 .expect("Failed to put data");
607
608 freezer.close().await.expect("Failed to close")
610 };
611
612 {
614 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
615 context.clone(),
616 cfg.clone(),
617 Some(checkpoint),
618 )
619 .await
620 .expect("Failed to initialize freezer");
621
622 assert_eq!(
624 freezer
625 .get(Identifier::Key(&test_key("committed1")))
626 .await
627 .unwrap(),
628 Some(1)
629 );
630 assert_eq!(
631 freezer
632 .get(Identifier::Key(&test_key("committed2")))
633 .await
634 .unwrap(),
635 Some(2)
636 );
637
638 if let Some(val) = freezer
641 .get(Identifier::Key(&test_key("uncommitted1")))
642 .await
643 .unwrap()
644 {
645 assert_eq!(val, 3);
646 }
647 if let Some(val) = freezer
648 .get(Identifier::Key(&test_key("uncommitted2")))
649 .await
650 .unwrap()
651 {
652 assert_eq!(val, 4);
653 }
654 }
655 });
656 }
657
658 #[test_traced]
659 fn test_destroy() {
660 let executor = deterministic::Runner::default();
662 executor.start(|context| async move {
663 let cfg = Config {
665 key_partition: "test_key_index".into(),
666 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
667 key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
668 value_partition: "test_value_journal".into(),
669 value_compression: None,
670 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
671 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
672 table_partition: "test_table".into(),
673 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
674 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
675 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
676 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
677 codec_config: (),
678 };
679 {
680 let mut freezer =
681 Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
682 .await
683 .expect("Failed to initialize freezer");
684
685 freezer
686 .put(test_key("destroy1"), 1)
687 .await
688 .expect("Failed to put data");
689 freezer
690 .put(test_key("destroy2"), 2)
691 .await
692 .expect("Failed to put data");
693
694 freezer.destroy().await.expect("Failed to destroy freezer");
696 }
697
698 {
700 let freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
701 .await
702 .expect("Failed to initialize freezer");
703
704 assert!(freezer
706 .get(Identifier::Key(&test_key("destroy1")))
707 .await
708 .unwrap()
709 .is_none());
710 assert!(freezer
711 .get(Identifier::Key(&test_key("destroy2")))
712 .await
713 .unwrap()
714 .is_none());
715 }
716 });
717 }
718
719 #[test_traced]
720 fn test_partial_table_entry_write() {
721 let executor = deterministic::Runner::default();
723 executor.start(|context| async move {
724 let cfg = Config {
726 key_partition: "test_key_index".into(),
727 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
728 key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
729 value_partition: "test_value_journal".into(),
730 value_compression: None,
731 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
732 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
733 table_partition: "test_table".into(),
734 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
735 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
736 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
737 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
738 codec_config: (),
739 };
740 let checkpoint = {
741 let mut freezer =
742 Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
743 .await
744 .expect("Failed to initialize freezer");
745
746 freezer.put(test_key("key1"), 42).await.unwrap();
747 freezer.sync().await.unwrap();
748 freezer.close().await.unwrap()
749 };
750
751 {
753 let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
754 blob.write_at(vec![0xFF; 10], 0).await.unwrap();
756 blob.sync().await.unwrap();
757 }
758
759 {
761 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
762 context.clone(),
763 cfg.clone(),
764 Some(checkpoint),
765 )
766 .await
767 .expect("Failed to initialize freezer");
768
769 let result = freezer
772 .get(Identifier::Key(&test_key("key1")))
773 .await
774 .unwrap();
775 assert!(result.is_none() || result == Some(42));
776 }
777 });
778 }
779
780 #[test_traced]
781 fn test_table_entry_invalid_crc() {
782 let executor = deterministic::Runner::default();
784 executor.start(|context| async move {
785 let cfg = Config {
786 key_partition: "test_key_index".into(),
787 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
788 key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
789 value_partition: "test_value_journal".into(),
790 value_compression: None,
791 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
792 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
793 table_partition: "test_table".into(),
794 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
795 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
796 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
797 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
798 codec_config: (),
799 };
800
801 let checkpoint = {
803 let mut freezer =
804 Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
805 .await
806 .expect("Failed to initialize freezer");
807
808 freezer.put(test_key("key1"), 42).await.unwrap();
809 freezer.sync().await.unwrap();
810 freezer.close().await.unwrap()
811 };
812
813 {
815 let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
816 let entry_data = blob.read_at(vec![0u8; 24], 0).await.unwrap();
818 let mut corrupted = entry_data.as_ref().to_vec();
819 corrupted[20] ^= 0xFF;
821 blob.write_at(corrupted, 0).await.unwrap();
822 blob.sync().await.unwrap();
823 }
824
825 {
827 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
828 context.clone(),
829 cfg.clone(),
830 Some(checkpoint),
831 )
832 .await
833 .expect("Failed to initialize freezer");
834
835 let result = freezer
837 .get(Identifier::Key(&test_key("key1")))
838 .await
839 .unwrap();
840 assert!(result.is_none() || result == Some(42));
842 }
843 });
844 }
845
846 #[test_traced]
847 fn test_table_extra_bytes() {
848 let executor = deterministic::Runner::default();
850 executor.start(|context| async move {
851 let cfg = Config {
852 key_partition: "test_key_index".into(),
853 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
854 key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
855 value_partition: "test_value_journal".into(),
856 value_compression: None,
857 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
858 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
859 table_partition: "test_table".into(),
860 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
861 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
862 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
863 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
864 codec_config: (),
865 };
866
867 let checkpoint = {
869 let mut freezer =
870 Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
871 .await
872 .expect("Failed to initialize freezer");
873
874 freezer.put(test_key("key1"), 42).await.unwrap();
875 freezer.sync().await.unwrap();
876 freezer.close().await.unwrap()
877 };
878
879 {
881 let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
882 blob.write_at(hex!("0xdeadbeef").to_vec(), size)
884 .await
885 .unwrap();
886 blob.sync().await.unwrap();
887 }
888
889 {
891 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
892 context.clone(),
893 cfg.clone(),
894 Some(checkpoint),
895 )
896 .await
897 .expect("Failed to initialize freezer");
898
899 assert_eq!(
901 freezer
902 .get(Identifier::Key(&test_key("key1")))
903 .await
904 .unwrap(),
905 Some(42)
906 );
907
908 let mut freezer_mut = freezer;
910 freezer_mut.put(test_key("key2"), 43).await.unwrap();
911 assert_eq!(
912 freezer_mut
913 .get(Identifier::Key(&test_key("key2")))
914 .await
915 .unwrap(),
916 Some(43)
917 );
918 }
919 });
920 }
921
922 #[test_traced]
923 fn test_indexing_across_resizes() {
924 let executor = deterministic::Runner::default();
926 executor.start(|context| async move {
927 let cfg = Config {
929 key_partition: "test_key_index".into(),
930 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
931 key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
932 value_partition: "test_value_journal".into(),
933 value_compression: None,
934 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
935 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
936 table_partition: "test_table".into(),
937 table_initial_size: 2, table_resize_frequency: 2, table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
940 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
941 codec_config: (),
942 };
943 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
944 .await
945 .expect("Failed to initialize freezer");
946
947 let mut keys = Vec::new();
950 for i in 0..1000 {
951 let key = test_key(&format!("key{i}"));
952 keys.push((key.clone(), i));
953
954 freezer.put(key, i).await.expect("Failed to put data");
956 freezer.sync().await.expect("Failed to sync");
957 }
958
959 for (key, value) in &keys {
961 let retrieved = freezer
962 .get(Identifier::Key(key))
963 .await
964 .expect("Failed to get data")
965 .expect("Data not found");
966 assert_eq!(retrieved, *value, "Value mismatch for key after resizes");
967 }
968
969 let checkpoint = freezer.close().await.expect("Failed to close");
971 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
972 context.clone(),
973 cfg.clone(),
974 Some(checkpoint),
975 )
976 .await
977 .expect("Failed to reinitialize freezer");
978
979 for (key, value) in &keys {
981 let retrieved = freezer
982 .get(Identifier::Key(key))
983 .await
984 .expect("Failed to get data")
985 .expect("Data not found");
986 assert_eq!(retrieved, *value, "Value mismatch for key after restart");
987 }
988
989 let buffer = context.encode();
991 assert!(buffer.contains("resizes_total 8"), "{}", buffer);
992 });
993 }
994
995 #[test_traced]
996 fn test_insert_during_resize() {
997 let executor = deterministic::Runner::default();
998 executor.start(|context| async move {
999 let cfg = Config {
1000 key_partition: "test_key_index".into(),
1001 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1002 key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1003 value_partition: "test_value_journal".into(),
1004 value_compression: None,
1005 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1006 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1007 table_partition: "test_table".into(),
1008 table_initial_size: 2,
1009 table_resize_frequency: 1,
1010 table_resize_chunk_size: 1, table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1012 codec_config: (),
1013 };
1014 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
1015 .await
1016 .unwrap();
1017
1018 freezer.put(test_key("key0"), 0).await.unwrap();
1021 freezer.put(test_key("key2"), 1).await.unwrap();
1022 freezer.sync().await.unwrap(); assert!(freezer.resizing().is_some());
1026
1027 freezer.put(test_key("key6"), 2).await.unwrap();
1030 assert!(context.encode().contains("unnecessary_writes_total 1"));
1031 assert_eq!(freezer.resizable(), 3);
1032
1033 freezer.put(test_key("key3"), 3).await.unwrap();
1036 assert!(context.encode().contains("unnecessary_writes_total 1"));
1037 assert_eq!(freezer.resizable(), 3);
1038
1039 freezer.sync().await.unwrap();
1041 assert!(freezer.resizing().is_none());
1042 assert_eq!(freezer.resizable(), 2);
1043
1044 freezer.put(test_key("key4"), 4).await.unwrap();
1047 freezer.put(test_key("key7"), 5).await.unwrap();
1048 freezer.sync().await.unwrap();
1049
1050 assert!(freezer.resizing().is_some());
1052
1053 let keys = ["key0", "key2", "key6", "key3", "key4", "key7"];
1055 for (i, k) in keys.iter().enumerate() {
1056 assert_eq!(
1057 freezer.get(Identifier::Key(&test_key(k))).await.unwrap(),
1058 Some(i as i32)
1059 );
1060 }
1061
1062 while freezer.resizing().is_some() {
1064 freezer.sync().await.unwrap();
1065 }
1066
1067 assert_eq!(freezer.resizable(), 0);
1069 });
1070 }
1071
1072 #[test_traced]
1073 fn test_resize_after_startup() {
1074 let executor = deterministic::Runner::default();
1075 executor.start(|context| async move {
1076 let cfg = Config {
1077 key_partition: "test_key_index".into(),
1078 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1079 key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1080 value_partition: "test_value_journal".into(),
1081 value_compression: None,
1082 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1083 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1084 table_partition: "test_table".into(),
1085 table_initial_size: 2,
1086 table_resize_frequency: 1,
1087 table_resize_chunk_size: 1, table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1089 codec_config: (),
1090 };
1091
1092 let checkpoint = {
1094 let mut freezer =
1095 Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
1096 .await
1097 .unwrap();
1098
1099 freezer.put(test_key("key0"), 0).await.unwrap();
1102 freezer.put(test_key("key2"), 1).await.unwrap();
1103 let checkpoint = freezer.sync().await.unwrap();
1104
1105 assert!(freezer.resizing().is_some());
1107
1108 checkpoint
1109 };
1110
1111 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
1113 context.clone(),
1114 cfg.clone(),
1115 Some(checkpoint),
1116 )
1117 .await
1118 .unwrap();
1119 assert_eq!(freezer.resizable(), 1);
1120
1121 freezer.sync().await.unwrap();
1124 assert!(freezer.resizing().is_some());
1125
1126 while freezer.resizing().is_some() {
1128 freezer.sync().await.unwrap();
1129 }
1130
1131 assert_eq!(freezer.resizable(), 0);
1133 });
1134 }
1135
1136 fn test_operations_and_restart(num_keys: usize) -> String {
1137 let executor = deterministic::Runner::default();
1139 executor.start(|mut context| async move {
1140 let cfg = Config {
1142 key_partition: "test_key_index".into(),
1143 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1144 key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1145 value_partition: "test_value_journal".into(),
1146 value_compression: None,
1147 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1148 value_target_size: 128, table_partition: "test_table".into(),
1150 table_initial_size: 8, table_resize_frequency: 2, table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1153 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1154 codec_config: (),
1155 };
1156 let mut freezer =
1157 Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init(context.clone(), cfg.clone())
1158 .await
1159 .expect("Failed to initialize freezer");
1160
1161 let mut pairs = Vec::new();
1163
1164 for _ in 0..num_keys {
1165 let mut key = [0u8; 96];
1167 context.fill_bytes(&mut key);
1168 let key = FixedBytes::<96>::new(key);
1169
1170 let mut value = [0u8; 256];
1172 context.fill_bytes(&mut value);
1173 let value = FixedBytes::<256>::new(value);
1174
1175 freezer
1177 .put(key.clone(), value.clone())
1178 .await
1179 .expect("Failed to put data");
1180 pairs.push((key, value));
1181
1182 if context.gen_bool(0.1) {
1184 freezer.sync().await.expect("Failed to sync");
1185 }
1186 }
1187
1188 freezer.sync().await.expect("Failed to sync");
1190
1191 for (key, value) in &pairs {
1193 let retrieved = freezer
1194 .get(Identifier::Key(key))
1195 .await
1196 .expect("Failed to get data")
1197 .expect("Data not found");
1198 assert_eq!(&retrieved, value);
1199 }
1200
1201 for (key, _) in &pairs {
1203 assert!(freezer
1204 .get(Identifier::Key(key))
1205 .await
1206 .expect("Failed to check key")
1207 .is_some());
1208 }
1209
1210 for _ in 0..10 {
1212 let mut key = [0u8; 96];
1213 context.fill_bytes(&mut key);
1214 let key = FixedBytes::<96>::new(key);
1215 assert!(freezer
1216 .get(Identifier::Key(&key))
1217 .await
1218 .expect("Failed to check key")
1219 .is_none());
1220 }
1221
1222 let checkpoint = freezer.close().await.expect("Failed to close freezer");
1224
1225 let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init_with_checkpoint(
1227 context.clone(),
1228 cfg.clone(),
1229 Some(checkpoint),
1230 )
1231 .await
1232 .expect("Failed to initialize freezer");
1233
1234 for (key, value) in &pairs {
1236 let retrieved = freezer
1237 .get(Identifier::Key(key))
1238 .await
1239 .expect("Failed to get data")
1240 .expect("Data not found");
1241 assert_eq!(&retrieved, value);
1242 }
1243
1244 for _ in 0..20 {
1246 let mut key = [0u8; 96];
1247 context.fill_bytes(&mut key);
1248 let key = FixedBytes::<96>::new(key);
1249
1250 let mut value = [0u8; 256];
1251 context.fill_bytes(&mut value);
1252 let value = FixedBytes::<256>::new(value);
1253
1254 freezer.put(key, value).await.expect("Failed to put data");
1255 }
1256
1257 for _ in 0..3 {
1259 freezer.sync().await.expect("Failed to sync");
1260
1261 for _ in 0..5 {
1263 let mut key = [0u8; 96];
1264 context.fill_bytes(&mut key);
1265 let key = FixedBytes::<96>::new(key);
1266
1267 let mut value = [0u8; 256];
1268 context.fill_bytes(&mut value);
1269 let value = FixedBytes::<256>::new(value);
1270
1271 freezer.put(key, value).await.expect("Failed to put data");
1272 }
1273 }
1274
1275 freezer.sync().await.expect("Failed to sync");
1277
1278 context.auditor().state()
1280 })
1281 }
1282
1283 #[test_group("slow")]
1284 #[test_traced]
1285 fn test_determinism() {
1286 let state1 = test_operations_and_restart(1_000);
1287 let state2 = test_operations_and_restart(1_000);
1288 assert_eq!(state1, state2);
1289 }
1290
1291 #[test_traced]
1292 fn test_put_multiple_updates() {
1293 let executor = deterministic::Runner::default();
1295 executor.start(|context| async move {
1296 let cfg = Config {
1298 key_partition: "test_key_index".into(),
1299 key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1300 key_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1301 value_partition: "test_value_journal".into(),
1302 value_compression: None,
1303 value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER),
1304 value_target_size: DEFAULT_VALUE_TARGET_SIZE,
1305 table_partition: "test_table".into(),
1306 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
1307 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
1308 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1309 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1310 codec_config: (),
1311 };
1312 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
1313 .await
1314 .expect("Failed to initialize freezer");
1315
1316 let key = test_key("key1");
1317
1318 freezer
1319 .put(key.clone(), 1)
1320 .await
1321 .expect("Failed to put data");
1322 freezer
1323 .put(key.clone(), 2)
1324 .await
1325 .expect("Failed to put data");
1326 freezer.sync().await.expect("Failed to sync");
1327 assert_eq!(
1328 freezer
1329 .get(Identifier::Key(&key))
1330 .await
1331 .expect("Failed to get data")
1332 .unwrap(),
1333 2
1334 );
1335 });
1336 }
1337}