1mod storage;
180use commonware_runtime::buffer::PoolRef;
181use commonware_utils::Array;
182use std::num::NonZeroUsize;
183pub use storage::{Checkpoint, Cursor, Freezer};
184use thiserror::Error;
185
186pub enum Identifier<'a, K: Array> {
188 Cursor(Cursor),
189 Key(&'a K),
190}
191
192#[derive(Debug, Error)]
194pub enum Error {
195 #[error("runtime error: {0}")]
196 Runtime(#[from] commonware_runtime::Error),
197 #[error("journal error: {0}")]
198 Journal(#[from] crate::journal::Error),
199 #[error("codec error: {0}")]
200 Codec(#[from] commonware_codec::Error),
201}
202
203#[derive(Clone)]
205pub struct Config<C> {
206 pub journal_partition: String,
208
209 pub journal_compression: Option<u8>,
211
212 pub journal_write_buffer: NonZeroUsize,
214
215 pub journal_target_size: u64,
217
218 pub journal_buffer_pool: PoolRef,
220
221 pub table_partition: String,
223
224 pub table_initial_size: u32,
226
227 pub table_resize_frequency: u8,
230
231 pub table_resize_chunk_size: u32,
233
234 pub table_replay_buffer: NonZeroUsize,
236
237 pub codec_config: C,
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244 use commonware_codec::DecodeExt;
245 use commonware_macros::test_traced;
246 use commonware_runtime::{deterministic, Blob, Metrics, Runner, Storage};
247 use commonware_utils::{hex, sequence::FixedBytes, NZUsize};
248 use rand::{Rng, RngCore};
249
250 const DEFAULT_JOURNAL_WRITE_BUFFER: usize = 1024;
251 const DEFAULT_JOURNAL_TARGET_SIZE: u64 = 10 * 1024 * 1024;
252 const DEFAULT_TABLE_INITIAL_SIZE: u32 = 256;
253 const DEFAULT_TABLE_RESIZE_FREQUENCY: u8 = 4;
254 const DEFAULT_TABLE_RESIZE_CHUNK_SIZE: u32 = 128; const DEFAULT_TABLE_REPLAY_BUFFER: usize = 64 * 1024; const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
257 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
258
259 fn test_key(key: &str) -> FixedBytes<64> {
260 let mut buf = [0u8; 64];
261 let key = key.as_bytes();
262 assert!(key.len() <= buf.len());
263 buf[..key.len()].copy_from_slice(key);
264 FixedBytes::decode(buf.as_ref()).unwrap()
265 }
266
267 fn test_put_get(compression: Option<u8>) {
268 let executor = deterministic::Runner::default();
270 executor.start(|context| async move {
271 let cfg = Config {
273 journal_partition: "test_journal".into(),
274 journal_compression: compression,
275 journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
276 journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
277 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
278 table_partition: "test_table".into(),
279 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
280 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
281 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
282 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
283 codec_config: (),
284 };
285 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
286 .await
287 .expect("Failed to initialize freezer");
288
289 let key = test_key("testkey");
290 let data = 42;
291
292 let value = freezer
294 .get(Identifier::Key(&key))
295 .await
296 .expect("Failed to check key");
297 assert!(value.is_none());
298
299 freezer
301 .put(key.clone(), data)
302 .await
303 .expect("Failed to put data");
304
305 let value = freezer
307 .get(Identifier::Key(&key))
308 .await
309 .expect("Failed to get data")
310 .expect("Data not found");
311 assert_eq!(value, data);
312
313 let buffer = context.encode();
315 assert!(buffer.contains("gets_total 2"), "{}", buffer);
316 assert!(buffer.contains("puts_total 1"), "{}", buffer);
317 assert!(buffer.contains("unnecessary_reads_total 0"), "{}", buffer);
318
319 freezer.sync().await.expect("Failed to sync data");
321 });
322 }
323
324 #[test_traced]
325 fn test_put_get_no_compression() {
326 test_put_get(None);
327 }
328
329 #[test_traced]
330 fn test_put_get_compression() {
331 test_put_get(Some(3));
332 }
333
334 #[test_traced]
335 fn test_multiple_keys() {
336 let executor = deterministic::Runner::default();
338 executor.start(|context| async move {
339 let cfg = Config {
341 journal_partition: "test_journal".into(),
342 journal_compression: None,
343 journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
344 journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
345 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
346 table_partition: "test_table".into(),
347 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
348 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
349 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
350 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
351 codec_config: (),
352 };
353 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
354 .await
355 .expect("Failed to initialize freezer");
356
357 let keys = vec![
359 (test_key("key1"), 1),
360 (test_key("key2"), 2),
361 (test_key("key3"), 3),
362 (test_key("key4"), 4),
363 (test_key("key5"), 5),
364 ];
365
366 for (key, data) in &keys {
367 freezer
368 .put(key.clone(), *data)
369 .await
370 .expect("Failed to put data");
371 }
372
373 for (key, data) in &keys {
375 let retrieved = freezer
376 .get(Identifier::Key(key))
377 .await
378 .expect("Failed to get data")
379 .expect("Data not found");
380 assert_eq!(retrieved, *data);
381 }
382 });
383 }
384
385 #[test_traced]
386 fn test_collision_handling() {
387 let executor = deterministic::Runner::default();
389 executor.start(|context| async move {
390 let cfg = Config {
392 journal_partition: "test_journal".into(),
393 journal_compression: None,
394 journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
395 journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
396 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
397 table_partition: "test_table".into(),
398 table_initial_size: 4, table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
400 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
401 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
402 codec_config: (),
403 };
404 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
405 .await
406 .expect("Failed to initialize freezer");
407
408 let keys = vec![
410 (test_key("key1"), 1),
411 (test_key("key2"), 2),
412 (test_key("key3"), 3),
413 (test_key("key4"), 4),
414 (test_key("key5"), 5),
415 (test_key("key6"), 6),
416 (test_key("key7"), 7),
417 (test_key("key8"), 8),
418 ];
419
420 for (key, data) in &keys {
421 freezer
422 .put(key.clone(), *data)
423 .await
424 .expect("Failed to put data");
425 }
426
427 freezer.sync().await.expect("Failed to sync");
429
430 for (key, data) in &keys {
432 let retrieved = freezer
433 .get(Identifier::Key(key))
434 .await
435 .expect("Failed to get data")
436 .expect("Data not found");
437 assert_eq!(retrieved, *data);
438 }
439
440 let buffer = context.encode();
442 assert!(buffer.contains("gets_total 8"), "{}", buffer);
443 assert!(buffer.contains("unnecessary_reads_total 5"), "{}", buffer);
444 });
445 }
446
447 #[test_traced]
448 fn test_restart() {
449 let executor = deterministic::Runner::default();
451 executor.start(|context| async move {
452 let cfg = Config {
453 journal_partition: "test_journal".into(),
454 journal_compression: None,
455 journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
456 journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
457 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
458 table_partition: "test_table".into(),
459 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
460 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
461 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
462 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
463 codec_config: (),
464 };
465
466 let checkpoint = {
468 let mut freezer =
469 Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
470 .await
471 .expect("Failed to initialize freezer");
472
473 let keys = vec![
474 (test_key("persist1"), 100),
475 (test_key("persist2"), 200),
476 (test_key("persist3"), 300),
477 ];
478
479 for (key, data) in &keys {
480 freezer
481 .put(key.clone(), *data)
482 .await
483 .expect("Failed to put data");
484 }
485
486 freezer.close().await.expect("Failed to close freezer")
487 };
488
489 {
491 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
492 context.clone(),
493 cfg.clone(),
494 Some(checkpoint),
495 )
496 .await
497 .expect("Failed to initialize freezer");
498
499 let keys = vec![
500 (test_key("persist1"), 100),
501 (test_key("persist2"), 200),
502 (test_key("persist3"), 300),
503 ];
504
505 for (key, data) in &keys {
506 let retrieved = freezer
507 .get(Identifier::Key(key))
508 .await
509 .expect("Failed to get data")
510 .expect("Data not found");
511 assert_eq!(retrieved, *data);
512 }
513 }
514 });
515 }
516
517 #[test_traced]
518 fn test_crash_consistency() {
519 let executor = deterministic::Runner::default();
521 executor.start(|context| async move {
522 let cfg = Config {
523 journal_partition: "test_journal".into(),
524 journal_compression: None,
525 journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
526 journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
527 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
528 table_partition: "test_table".into(),
529 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
530 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
531 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
532 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
533 codec_config: (),
534 };
535
536 let checkpoint = {
538 let mut freezer =
539 Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
540 .await
541 .expect("Failed to initialize freezer");
542
543 freezer
544 .put(test_key("committed1"), 1)
545 .await
546 .expect("Failed to put data");
547 freezer
548 .put(test_key("committed2"), 2)
549 .await
550 .expect("Failed to put data");
551
552 freezer.sync().await.expect("Failed to sync");
554
555 freezer
557 .put(test_key("uncommitted1"), 3)
558 .await
559 .expect("Failed to put data");
560 freezer
561 .put(test_key("uncommitted2"), 4)
562 .await
563 .expect("Failed to put data");
564
565 freezer.close().await.expect("Failed to close")
567 };
568
569 {
571 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
572 context.clone(),
573 cfg.clone(),
574 Some(checkpoint),
575 )
576 .await
577 .expect("Failed to initialize freezer");
578
579 assert_eq!(
581 freezer
582 .get(Identifier::Key(&test_key("committed1")))
583 .await
584 .unwrap(),
585 Some(1)
586 );
587 assert_eq!(
588 freezer
589 .get(Identifier::Key(&test_key("committed2")))
590 .await
591 .unwrap(),
592 Some(2)
593 );
594
595 if let Some(val) = freezer
598 .get(Identifier::Key(&test_key("uncommitted1")))
599 .await
600 .unwrap()
601 {
602 assert_eq!(val, 3);
603 }
604 if let Some(val) = freezer
605 .get(Identifier::Key(&test_key("uncommitted2")))
606 .await
607 .unwrap()
608 {
609 assert_eq!(val, 4);
610 }
611 }
612 });
613 }
614
615 #[test_traced]
616 fn test_destroy() {
617 let executor = deterministic::Runner::default();
619 executor.start(|context| async move {
620 let cfg = Config {
622 journal_partition: "test_journal".into(),
623 journal_compression: None,
624 journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
625 journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
626 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
627 table_partition: "test_table".into(),
628 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
629 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
630 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
631 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
632 codec_config: (),
633 };
634 {
635 let mut freezer =
636 Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
637 .await
638 .expect("Failed to initialize freezer");
639
640 freezer
641 .put(test_key("destroy1"), 1)
642 .await
643 .expect("Failed to put data");
644 freezer
645 .put(test_key("destroy2"), 2)
646 .await
647 .expect("Failed to put data");
648
649 freezer.destroy().await.expect("Failed to destroy freezer");
651 }
652
653 {
655 let freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
656 .await
657 .expect("Failed to initialize freezer");
658
659 assert!(freezer
661 .get(Identifier::Key(&test_key("destroy1")))
662 .await
663 .unwrap()
664 .is_none());
665 assert!(freezer
666 .get(Identifier::Key(&test_key("destroy2")))
667 .await
668 .unwrap()
669 .is_none());
670 }
671 });
672 }
673
674 #[test_traced]
675 fn test_partial_table_entry_write() {
676 let executor = deterministic::Runner::default();
678 executor.start(|context| async move {
679 let cfg = Config {
681 journal_partition: "test_journal".into(),
682 journal_compression: None,
683 journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
684 journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
685 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
686 table_partition: "test_table".into(),
687 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
688 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
689 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
690 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
691 codec_config: (),
692 };
693 let checkpoint = {
694 let mut freezer =
695 Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
696 .await
697 .expect("Failed to initialize freezer");
698
699 freezer.put(test_key("key1"), 42).await.unwrap();
700 freezer.sync().await.unwrap();
701 freezer.close().await.unwrap()
702 };
703
704 {
706 let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
707 blob.write_at(vec![0xFF; 10], 0).await.unwrap();
709 blob.sync().await.unwrap();
710 }
711
712 {
714 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
715 context.clone(),
716 cfg.clone(),
717 Some(checkpoint),
718 )
719 .await
720 .expect("Failed to initialize freezer");
721
722 let result = freezer
725 .get(Identifier::Key(&test_key("key1")))
726 .await
727 .unwrap();
728 assert!(result.is_none() || result == Some(42));
729 }
730 });
731 }
732
733 #[test_traced]
734 fn test_table_entry_invalid_crc() {
735 let executor = deterministic::Runner::default();
737 executor.start(|context| async move {
738 let cfg = Config {
739 journal_partition: "test_journal".into(),
740 journal_compression: None,
741 journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
742 journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
743 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
744 table_partition: "test_table".into(),
745 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
746 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
747 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
748 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
749 codec_config: (),
750 };
751
752 let checkpoint = {
754 let mut freezer =
755 Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
756 .await
757 .expect("Failed to initialize freezer");
758
759 freezer.put(test_key("key1"), 42).await.unwrap();
760 freezer.sync().await.unwrap();
761 freezer.close().await.unwrap()
762 };
763
764 {
766 let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
767 let entry_data = blob.read_at(vec![0u8; 24], 0).await.unwrap();
769 let mut corrupted = entry_data.as_ref().to_vec();
770 corrupted[20] ^= 0xFF;
772 blob.write_at(corrupted, 0).await.unwrap();
773 blob.sync().await.unwrap();
774 }
775
776 {
778 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
779 context.clone(),
780 cfg.clone(),
781 Some(checkpoint),
782 )
783 .await
784 .expect("Failed to initialize freezer");
785
786 let result = freezer
788 .get(Identifier::Key(&test_key("key1")))
789 .await
790 .unwrap();
791 assert!(result.is_none() || result == Some(42));
793 }
794 });
795 }
796
797 #[test_traced]
798 fn test_table_extra_bytes() {
799 let executor = deterministic::Runner::default();
801 executor.start(|context| async move {
802 let cfg = Config {
803 journal_partition: "test_journal".into(),
804 journal_compression: None,
805 journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
806 journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
807 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
808 table_partition: "test_table".into(),
809 table_initial_size: DEFAULT_TABLE_INITIAL_SIZE,
810 table_resize_frequency: DEFAULT_TABLE_RESIZE_FREQUENCY,
811 table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
812 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
813 codec_config: (),
814 };
815
816 let checkpoint = {
818 let mut freezer =
819 Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
820 .await
821 .expect("Failed to initialize freezer");
822
823 freezer.put(test_key("key1"), 42).await.unwrap();
824 freezer.sync().await.unwrap();
825 freezer.close().await.unwrap()
826 };
827
828 {
830 let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
831 blob.write_at(hex!("0xdeadbeef").to_vec(), size)
833 .await
834 .unwrap();
835 blob.sync().await.unwrap();
836 }
837
838 {
840 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
841 context.clone(),
842 cfg.clone(),
843 Some(checkpoint),
844 )
845 .await
846 .expect("Failed to initialize freezer");
847
848 assert_eq!(
850 freezer
851 .get(Identifier::Key(&test_key("key1")))
852 .await
853 .unwrap(),
854 Some(42)
855 );
856
857 let mut freezer_mut = freezer;
859 freezer_mut.put(test_key("key2"), 43).await.unwrap();
860 assert_eq!(
861 freezer_mut
862 .get(Identifier::Key(&test_key("key2")))
863 .await
864 .unwrap(),
865 Some(43)
866 );
867 }
868 });
869 }
870
871 #[test_traced]
872 fn test_indexing_across_resizes() {
873 let executor = deterministic::Runner::default();
875 executor.start(|context| async move {
876 let cfg = Config {
878 journal_partition: "test_journal".into(),
879 journal_compression: None,
880 journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
881 journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
882 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
883 table_partition: "test_table".into(),
884 table_initial_size: 2, table_resize_frequency: 2, table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
887 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
888 codec_config: (),
889 };
890 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
891 .await
892 .expect("Failed to initialize freezer");
893
894 let mut keys = Vec::new();
897 for i in 0..1000 {
898 let key = test_key(&format!("key{i}"));
899 keys.push((key.clone(), i));
900
901 freezer.put(key, i).await.expect("Failed to put data");
903 freezer.sync().await.expect("Failed to sync");
904 }
905
906 for (key, value) in &keys {
908 let retrieved = freezer
909 .get(Identifier::Key(key))
910 .await
911 .expect("Failed to get data")
912 .expect("Data not found");
913 assert_eq!(retrieved, *value, "Value mismatch for key after resizes");
914 }
915
916 let checkpoint = freezer.close().await.expect("Failed to close");
918 let freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
919 context.clone(),
920 cfg.clone(),
921 Some(checkpoint),
922 )
923 .await
924 .expect("Failed to reinitialize freezer");
925
926 for (key, value) in &keys {
928 let retrieved = freezer
929 .get(Identifier::Key(key))
930 .await
931 .expect("Failed to get data")
932 .expect("Data not found");
933 assert_eq!(retrieved, *value, "Value mismatch for key after restart");
934 }
935
936 let buffer = context.encode();
938 assert!(buffer.contains("resizes_total 8"), "{}", buffer);
939 });
940 }
941
942 #[test_traced]
943 fn test_insert_during_resize() {
944 let executor = deterministic::Runner::default();
945 executor.start(|context| async move {
946 let cfg = Config {
947 journal_partition: "test_journal".into(),
948 journal_compression: None,
949 journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
950 journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
951 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
952 table_partition: "test_table".into(),
953 table_initial_size: 2,
954 table_resize_frequency: 1,
955 table_resize_chunk_size: 1, table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
957 codec_config: (),
958 };
959 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
960 .await
961 .unwrap();
962
963 freezer.put(test_key("key0"), 0).await.unwrap();
965 freezer.put(test_key("key1"), 1).await.unwrap();
966 freezer.sync().await.unwrap(); assert!(freezer.resizing().is_some());
970
971 freezer.put(test_key("key2"), 2).await.unwrap();
973 assert!(context.encode().contains("unnecessary_writes_total 1"));
974 assert_eq!(freezer.resizable(), 3);
975
976 freezer.put(test_key("key3"), 3).await.unwrap();
978 assert!(context.encode().contains("unnecessary_writes_total 1"));
979 assert_eq!(freezer.resizable(), 3);
980
981 freezer.sync().await.unwrap();
983 assert!(freezer.resizing().is_none());
984 assert_eq!(freezer.resizable(), 2);
985
986 freezer.put(test_key("key4"), 4).await.unwrap();
988 freezer.put(test_key("key5"), 5).await.unwrap();
989 freezer.sync().await.unwrap();
990
991 assert!(freezer.resizing().is_some());
993
994 for i in 0..6 {
996 let key = test_key(&format!("key{i}"));
997 assert_eq!(freezer.get(Identifier::Key(&key)).await.unwrap(), Some(i));
998 }
999
1000 while freezer.resizing().is_some() {
1002 freezer.sync().await.unwrap();
1003 }
1004
1005 assert_eq!(freezer.resizable(), 0);
1007 });
1008 }
1009
1010 #[test_traced]
1011 fn test_resize_after_startup() {
1012 let executor = deterministic::Runner::default();
1013 executor.start(|context| async move {
1014 let cfg = Config {
1015 journal_partition: "test_journal".into(),
1016 journal_compression: None,
1017 journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
1018 journal_target_size: DEFAULT_JOURNAL_TARGET_SIZE,
1019 journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1020 table_partition: "test_table".into(),
1021 table_initial_size: 2,
1022 table_resize_frequency: 1,
1023 table_resize_chunk_size: 1, table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1025 codec_config: (),
1026 };
1027
1028 let checkpoint = {
1030 let mut freezer =
1031 Freezer::<_, FixedBytes<64>, i32>::init(context.clone(), cfg.clone())
1032 .await
1033 .unwrap();
1034
1035 freezer.put(test_key("key0"), 0).await.unwrap();
1037 freezer.put(test_key("key1"), 1).await.unwrap();
1038 let checkpoint = freezer.sync().await.unwrap();
1039
1040 assert!(freezer.resizing().is_some());
1042
1043 checkpoint
1044 };
1045
1046 let mut freezer = Freezer::<_, FixedBytes<64>, i32>::init_with_checkpoint(
1048 context.clone(),
1049 cfg.clone(),
1050 Some(checkpoint),
1051 )
1052 .await
1053 .unwrap();
1054 assert_eq!(freezer.resizable(), 1);
1055
1056 freezer.sync().await.unwrap();
1059 assert!(freezer.resizing().is_some());
1060
1061 while freezer.resizing().is_some() {
1063 freezer.sync().await.unwrap();
1064 }
1065
1066 assert_eq!(freezer.resizable(), 0);
1068 });
1069 }
1070
1071 fn test_operations_and_restart(num_keys: usize) -> String {
1072 let executor = deterministic::Runner::default();
1074 executor.start(|mut context| async move {
1075 let cfg = Config {
1077 journal_partition: "test_journal".into(),
1078 journal_compression: None,
1079 journal_write_buffer: NZUsize!(DEFAULT_JOURNAL_WRITE_BUFFER),
1080 journal_target_size: 128, journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1082 table_partition: "test_table".into(),
1083 table_initial_size: 8, table_resize_frequency: 2, table_resize_chunk_size: DEFAULT_TABLE_RESIZE_CHUNK_SIZE,
1086 table_replay_buffer: NZUsize!(DEFAULT_TABLE_REPLAY_BUFFER),
1087 codec_config: (),
1088 };
1089 let mut freezer =
1090 Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init(context.clone(), cfg.clone())
1091 .await
1092 .expect("Failed to initialize freezer");
1093
1094 let mut pairs = Vec::new();
1096
1097 for _ in 0..num_keys {
1098 let mut key = [0u8; 96];
1100 context.fill_bytes(&mut key);
1101 let key = FixedBytes::<96>::new(key);
1102
1103 let mut value = [0u8; 256];
1105 context.fill_bytes(&mut value);
1106 let value = FixedBytes::<256>::new(value);
1107
1108 freezer
1110 .put(key.clone(), value.clone())
1111 .await
1112 .expect("Failed to put data");
1113 pairs.push((key, value));
1114
1115 if context.gen_bool(0.1) {
1117 freezer.sync().await.expect("Failed to sync");
1118 }
1119 }
1120
1121 freezer.sync().await.expect("Failed to sync");
1123
1124 for (key, value) in &pairs {
1126 let retrieved = freezer
1127 .get(Identifier::Key(key))
1128 .await
1129 .expect("Failed to get data")
1130 .expect("Data not found");
1131 assert_eq!(&retrieved, value);
1132 }
1133
1134 for (key, _) in &pairs {
1136 assert!(freezer
1137 .get(Identifier::Key(key))
1138 .await
1139 .expect("Failed to check key")
1140 .is_some());
1141 }
1142
1143 for _ in 0..10 {
1145 let mut key = [0u8; 96];
1146 context.fill_bytes(&mut key);
1147 let key = FixedBytes::<96>::new(key);
1148 assert!(freezer
1149 .get(Identifier::Key(&key))
1150 .await
1151 .expect("Failed to check key")
1152 .is_none());
1153 }
1154
1155 let checkpoint = freezer.close().await.expect("Failed to close freezer");
1157
1158 let mut freezer = Freezer::<_, FixedBytes<96>, FixedBytes<256>>::init_with_checkpoint(
1160 context.clone(),
1161 cfg.clone(),
1162 Some(checkpoint),
1163 )
1164 .await
1165 .expect("Failed to initialize freezer");
1166
1167 for (key, value) in &pairs {
1169 let retrieved = freezer
1170 .get(Identifier::Key(key))
1171 .await
1172 .expect("Failed to get data")
1173 .expect("Data not found");
1174 assert_eq!(&retrieved, value);
1175 }
1176
1177 for _ in 0..20 {
1179 let mut key = [0u8; 96];
1180 context.fill_bytes(&mut key);
1181 let key = FixedBytes::<96>::new(key);
1182
1183 let mut value = [0u8; 256];
1184 context.fill_bytes(&mut value);
1185 let value = FixedBytes::<256>::new(value);
1186
1187 freezer.put(key, value).await.expect("Failed to put data");
1188 }
1189
1190 for _ in 0..3 {
1192 freezer.sync().await.expect("Failed to sync");
1193
1194 for _ in 0..5 {
1196 let mut key = [0u8; 96];
1197 context.fill_bytes(&mut key);
1198 let key = FixedBytes::<96>::new(key);
1199
1200 let mut value = [0u8; 256];
1201 context.fill_bytes(&mut value);
1202 let value = FixedBytes::<256>::new(value);
1203
1204 freezer.put(key, value).await.expect("Failed to put data");
1205 }
1206 }
1207
1208 freezer.sync().await.expect("Failed to sync");
1210
1211 context.auditor().state()
1213 })
1214 }
1215
1216 #[test_traced]
1217 #[ignore]
1218 fn test_determinism() {
1219 let state1 = test_operations_and_restart(1_000);
1220 let state2 = test_operations_and_restart(1_000);
1221 assert_eq!(state1, state2);
1222 }
1223}