1mod storage;
149pub use storage::{Archive, Identifier};
150pub mod translator;
151
152use prometheus_client::registry::Registry;
153use std::{
154 hash::Hash,
155 sync::{Arc, Mutex},
156};
157use thiserror::Error;
158
159#[derive(Debug, Error)]
161pub enum Error {
162 #[error("journal error: {0}")]
163 Journal(#[from] crate::journal::Error),
164 #[error("record corrupted")]
165 RecordCorrupted,
166 #[error("duplicate index")]
167 DuplicateIndex,
168 #[error("already pruned to: {0}")]
169 AlreadyPrunedTo(u64),
170 #[error("invalid key length")]
171 InvalidKeyLength,
172 #[error("record too large")]
173 RecordTooLarge,
174 #[error("compression failed")]
175 CompressionFailed,
176 #[error("decompression failed")]
177 DecompressionFailed,
178}
179
180pub trait Translator: Clone {
186 type Key: Eq + Hash + Send + Sync + Clone;
187
188 fn transform(&self, key: &[u8]) -> Self::Key;
190}
191
192#[derive(Clone)]
194pub struct Config<T: Translator> {
195 pub registry: Arc<Mutex<Registry>>,
197
198 pub section_mask: u64,
202
203 pub key_len: u32,
210
211 pub translator: T,
216
217 pub pending_writes: usize,
221
222 pub replay_concurrency: usize,
224
225 pub compression: Option<u8>,
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use crate::journal::variable::{Config as JConfig, Journal};
233 use crate::journal::Error as JournalError;
234 use bytes::Bytes;
235 use commonware_macros::test_traced;
236 use commonware_runtime::{deterministic::Executor, Blob, Runner, Storage};
237 use prometheus_client::{encoding::text::encode, registry::Registry};
238 use rand::Rng;
239 use std::{
240 collections::BTreeMap,
241 sync::{Arc, Mutex},
242 };
243 use translator::{FourCap, TwoCap};
244
245 const DEFAULT_SECTION_MASK: u64 = 0xffff_ffff_ffff_0000u64;
246
247 fn test_archive_put_get(compression: Option<u8>) {
248 let (executor, context, _) = Executor::default();
250 executor.start(async move {
251 let registry = Arc::new(Mutex::new(Registry::default()));
253
254 let journal = Journal::init(
256 context,
257 JConfig {
258 registry: registry.clone(),
259 partition: "test_partition".into(),
260 },
261 )
262 .await
263 .expect("Failed to initialize journal");
264
265 let cfg = Config {
267 registry,
268 key_len: 7,
269 translator: FourCap,
270 pending_writes: 10,
271 replay_concurrency: 4,
272 compression,
273 section_mask: DEFAULT_SECTION_MASK,
274 };
275 let mut archive = Archive::init(journal, cfg.clone())
276 .await
277 .expect("Failed to initialize archive");
278
279 let index = 1u64;
280 let key = b"testkey";
281 let data = Bytes::from("testdata");
282
283 let has = archive
285 .has(Identifier::Index(index))
286 .await
287 .expect("Failed to check key");
288 assert!(!has);
289 let has = archive
290 .has(Identifier::Key(key))
291 .await
292 .expect("Failed to check key");
293 assert!(!has);
294
295 archive
297 .put(index, key, data.clone())
298 .await
299 .expect("Failed to put data");
300
301 let has = archive
303 .has(Identifier::Index(index))
304 .await
305 .expect("Failed to check key");
306 assert!(has);
307 let has = archive
308 .has(Identifier::Key(key))
309 .await
310 .expect("Failed to check key");
311 assert!(has);
312
313 let retrieved = archive
315 .get(Identifier::Index(index))
316 .await
317 .expect("Failed to get data")
318 .expect("Data not found");
319 assert_eq!(retrieved, data);
320 let retrieved = archive
321 .get(Identifier::Key(key))
322 .await
323 .expect("Failed to get data")
324 .expect("Data not found");
325 assert_eq!(retrieved, data);
326
327 let mut buffer = String::new();
329 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
330 assert!(buffer.contains("items_tracked 1"));
331 assert!(buffer.contains("unnecessary_reads_total 0"));
332 assert!(buffer.contains("gets_total 2"));
333 assert!(buffer.contains("has_total 4"));
334 assert!(buffer.contains("syncs_total 0"));
335
336 archive.sync().await.expect("Failed to sync data");
338
339 let mut buffer = String::new();
341 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
342 assert!(buffer.contains("items_tracked 1"));
343 assert!(buffer.contains("unnecessary_reads_total 0"));
344 assert!(buffer.contains("gets_total 2"));
345 assert!(buffer.contains("has_total 4"));
346 assert!(buffer.contains("syncs_total 1"));
347 });
348 }
349
350 #[test_traced]
351 fn test_archive_put_get_no_compression() {
352 test_archive_put_get(None);
353 }
354
355 #[test_traced]
356 fn test_archive_put_get_compression() {
357 test_archive_put_get(Some(3));
358 }
359
360 #[test_traced]
361 fn test_archive_compression_then_none() {
362 let (executor, context, _) = Executor::default();
364 executor.start(async move {
365 let journal = Journal::init(
367 context.clone(),
368 JConfig {
369 registry: Arc::new(Mutex::new(Registry::default())),
370 partition: "test_partition".into(),
371 },
372 )
373 .await
374 .expect("Failed to initialize journal");
375
376 let cfg = Config {
378 registry: Arc::new(Mutex::new(Registry::default())),
379 key_len: 7,
380 translator: FourCap,
381 pending_writes: 10,
382 replay_concurrency: 4,
383 compression: Some(3),
384 section_mask: DEFAULT_SECTION_MASK,
385 };
386 let mut archive = Archive::init(journal, cfg.clone())
387 .await
388 .expect("Failed to initialize archive");
389
390 let index = 1u64;
392 let key = b"testkey";
393 let data = Bytes::from("testdata");
394 archive
395 .put(index, key, data.clone())
396 .await
397 .expect("Failed to put data");
398
399 archive.close().await.expect("Failed to close archive");
401
402 let journal = Journal::init(
404 context,
405 JConfig {
406 registry: Arc::new(Mutex::new(Registry::default())),
407 partition: "test_partition".into(),
408 },
409 )
410 .await
411 .expect("Failed to initialize journal");
412 let cfg = Config {
413 registry: Arc::new(Mutex::new(Registry::default())),
414 key_len: 7,
415 translator: FourCap,
416 pending_writes: 10,
417 replay_concurrency: 4,
418 compression: None,
419 section_mask: DEFAULT_SECTION_MASK,
420 };
421 let archive = Archive::init(journal, cfg.clone())
422 .await
423 .expect("Failed to initialize archive");
424
425 let retrieved = archive
427 .get(Identifier::Index(index))
428 .await
429 .expect("Failed to get data")
430 .expect("Data not found");
431 assert_ne!(retrieved, data);
432 let retrieved = archive
433 .get(Identifier::Key(key))
434 .await
435 .expect("Failed to get data")
436 .expect("Data not found");
437 assert_ne!(retrieved, data);
438 });
439 }
440
441 #[test_traced]
442 fn test_archive_invalid_key_length() {
443 let (executor, context, _) = Executor::default();
445 executor.start(async move {
446 let registry = Arc::new(Mutex::new(Registry::default()));
448
449 let journal = Journal::init(
451 context,
452 JConfig {
453 registry: registry.clone(),
454 partition: "test_partition".into(),
455 },
456 )
457 .await
458 .expect("Failed to initialize journal");
459
460 let cfg = Config {
462 registry,
463 key_len: 8,
464 translator: FourCap,
465 pending_writes: 10,
466 replay_concurrency: 4,
467 compression: None,
468 section_mask: DEFAULT_SECTION_MASK,
469 };
470 let mut archive = Archive::init(journal, cfg.clone())
471 .await
472 .expect("Failed to initialize archive");
473
474 let index = 1u64;
475 let key = b"invalidkey";
476 let data = Bytes::from("invaliddata");
477
478 let result = archive.put(index, key, data).await;
480 assert!(matches!(result, Err(Error::InvalidKeyLength)));
481
482 let result = archive.get(Identifier::Key(key)).await;
484 assert!(matches!(result, Err(Error::InvalidKeyLength)));
485
486 let result = archive.has(Identifier::Key(key)).await;
488 assert!(matches!(result, Err(Error::InvalidKeyLength)));
489
490 let mut buffer = String::new();
492 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
493 assert!(buffer.contains("items_tracked 0"));
494 assert!(buffer.contains("unnecessary_reads_total 0"));
495 assert!(buffer.contains("gets_total 0"));
496 });
497 }
498
499 #[test_traced]
500 fn test_archive_record_corruption() {
501 let (executor, context, _) = Executor::default();
503 executor.start(async move {
504 let journal = Journal::init(
506 context.clone(),
507 JConfig {
508 registry: Arc::new(Mutex::new(Registry::default())),
509 partition: "test_partition".into(),
510 },
511 )
512 .await
513 .expect("Failed to initialize journal");
514
515 let cfg = Config {
517 registry: Arc::new(Mutex::new(Registry::default())),
518 key_len: 7,
519 translator: FourCap,
520 pending_writes: 10,
521 replay_concurrency: 4,
522 compression: None,
523 section_mask: DEFAULT_SECTION_MASK,
524 };
525 let mut archive = Archive::init(journal, cfg.clone())
526 .await
527 .expect("Failed to initialize archive");
528
529 let index = 1u64;
530 let key = b"testkey";
531 let data = Bytes::from("testdata");
532
533 archive
535 .put(index, key, data.clone())
536 .await
537 .expect("Failed to put data");
538
539 archive.close().await.expect("Failed to close archive");
541
542 let section = index & DEFAULT_SECTION_MASK;
544 let blob = context
545 .open("test_partition", §ion.to_be_bytes())
546 .await
547 .unwrap();
548 let value_location = 4 + 8 + cfg.key_len as u64 + 4;
549 blob.write_at(b"testdaty", value_location).await.unwrap();
550 blob.close().await.unwrap();
551
552 let journal = Journal::init(
554 context,
555 JConfig {
556 registry: Arc::new(Mutex::new(Registry::default())),
557 partition: "test_partition".into(),
558 },
559 )
560 .await
561 .expect("Failed to initialize journal");
562 let archive = Archive::init(
563 journal,
564 Config {
565 registry: Arc::new(Mutex::new(Registry::default())),
566 key_len: 7,
567 translator: FourCap,
568 pending_writes: 10,
569 replay_concurrency: 4,
570 compression: None,
571 section_mask: DEFAULT_SECTION_MASK,
572 },
573 )
574 .await
575 .expect("Failed to initialize archive");
576
577 let result = archive.get(Identifier::Key(key)).await;
579 assert!(matches!(
580 result,
581 Err(Error::Journal(JournalError::ChecksumMismatch(_, _)))
582 ));
583 });
584 }
585
586 #[test_traced]
587 fn test_archive_duplicate_key() {
588 let (executor, context, _) = Executor::default();
590 executor.start(async move {
591 let registry = Arc::new(Mutex::new(Registry::default()));
593
594 let journal = Journal::init(
596 context,
597 JConfig {
598 registry: registry.clone(),
599 partition: "test_partition".into(),
600 },
601 )
602 .await
603 .expect("Failed to initialize journal");
604
605 let cfg = Config {
607 registry,
608 key_len: 9,
609 translator: FourCap,
610 pending_writes: 10,
611 replay_concurrency: 4,
612 compression: None,
613 section_mask: DEFAULT_SECTION_MASK,
614 };
615 let mut archive = Archive::init(journal, cfg.clone())
616 .await
617 .expect("Failed to initialize archive");
618
619 let index = 1u64;
620 let key = b"duplicate";
621 let data1 = Bytes::from("data1");
622 let data2 = Bytes::from("data2");
623
624 archive
626 .put(index, key, data1.clone())
627 .await
628 .expect("Failed to put data");
629
630 let result = archive.put(index, key, data2.clone()).await;
632 assert!(matches!(result, Err(Error::DuplicateIndex)));
633
634 let retrieved = archive
636 .get(Identifier::Index(index))
637 .await
638 .expect("Failed to get data")
639 .expect("Data not found");
640 assert_eq!(retrieved, data1);
641 let retrieved = archive
642 .get(Identifier::Key(key))
643 .await
644 .expect("Failed to get data")
645 .expect("Data not found");
646 assert_eq!(retrieved, data1);
647
648 let mut buffer = String::new();
650 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
651 assert!(buffer.contains("items_tracked 1"));
652 assert!(buffer.contains("unnecessary_reads_total 0"));
653 assert!(buffer.contains("gets_total 2"));
654 });
655 }
656
657 #[test_traced]
658 fn test_archive_get_nonexistent() {
659 let (executor, context, _) = Executor::default();
661 executor.start(async move {
662 let registry = Arc::new(Mutex::new(Registry::default()));
664
665 let journal = Journal::init(
667 context,
668 JConfig {
669 registry: registry.clone(),
670 partition: "test_partition".into(),
671 },
672 )
673 .await
674 .expect("Failed to initialize journal");
675
676 let cfg = Config {
678 registry,
679 key_len: 11,
680 translator: FourCap,
681 pending_writes: 10,
682 replay_concurrency: 4,
683 compression: None,
684 section_mask: DEFAULT_SECTION_MASK,
685 };
686 let archive = Archive::init(journal, cfg.clone())
687 .await
688 .expect("Failed to initialize archive");
689
690 let index = 1u64;
692 let retrieved = archive
693 .get(Identifier::Index(index))
694 .await
695 .expect("Failed to get data");
696 assert!(retrieved.is_none());
697
698 let key = b"nonexistent";
700 let retrieved = archive
701 .get(Identifier::Key(key))
702 .await
703 .expect("Failed to get data");
704 assert!(retrieved.is_none());
705
706 let mut buffer = String::new();
708 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
709 assert!(buffer.contains("items_tracked 0"));
710 assert!(buffer.contains("unnecessary_reads_total 0"));
711 assert!(buffer.contains("gets_total 2"));
712 });
713 }
714
715 #[test_traced]
716 fn test_archive_overlapping_key() {
717 let (executor, context, _) = Executor::default();
719 executor.start(async move {
720 let registry = Arc::new(Mutex::new(Registry::default()));
722
723 let journal = Journal::init(
725 context,
726 JConfig {
727 registry: registry.clone(),
728 partition: "test_partition".into(),
729 },
730 )
731 .await
732 .expect("Failed to initialize journal");
733
734 let cfg = Config {
736 registry,
737 key_len: 5,
738 translator: FourCap,
739 pending_writes: 10,
740 replay_concurrency: 4,
741 compression: None,
742 section_mask: DEFAULT_SECTION_MASK,
743 };
744 let mut archive = Archive::init(journal, cfg.clone())
745 .await
746 .expect("Failed to initialize archive");
747
748 let index1 = 1u64;
749 let key1 = b"keys1";
750 let data1 = Bytes::from("data1");
751 let index2 = 2u64;
752 let key2 = b"keys2";
753 let data2 = Bytes::from("data2");
754
755 archive
757 .put(index1, key1, data1.clone())
758 .await
759 .expect("Failed to put data");
760
761 archive
763 .put(index2, key2, data2.clone())
764 .await
765 .expect("Failed to put data");
766
767 let retrieved = archive
769 .get(Identifier::Key(key1))
770 .await
771 .expect("Failed to get data")
772 .expect("Data not found");
773 assert_eq!(retrieved, data1);
774
775 let retrieved = archive
777 .get(Identifier::Key(key2))
778 .await
779 .expect("Failed to get data")
780 .expect("Data not found");
781 assert_eq!(retrieved, data2);
782
783 let mut buffer = String::new();
785 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
786 assert!(buffer.contains("items_tracked 2"));
787 assert!(buffer.contains("unnecessary_reads_total 1"));
788 assert!(buffer.contains("gets_total 2"));
789 });
790 }
791
792 #[test_traced]
793 fn test_archive_overlapping_key_multiple_sections() {
794 let (executor, context, _) = Executor::default();
796 executor.start(async move {
797 let registry = Arc::new(Mutex::new(Registry::default()));
799
800 let journal = Journal::init(
802 context,
803 JConfig {
804 registry: registry.clone(),
805 partition: "test_partition".into(),
806 },
807 )
808 .await
809 .expect("Failed to initialize journal");
810
811 let cfg = Config {
813 registry,
814 key_len: 5,
815 translator: FourCap,
816 pending_writes: 10,
817 replay_concurrency: 4,
818 compression: None,
819 section_mask: DEFAULT_SECTION_MASK,
820 };
821 let mut archive = Archive::init(journal, cfg.clone())
822 .await
823 .expect("Failed to initialize archive");
824
825 let index1 = 1u64;
826 let key1 = b"keys1";
827 let data1 = Bytes::from("data1");
828 let index2 = 2_000_000u64;
829 let key2 = b"keys2";
830 let data2 = Bytes::from("data2");
831
832 archive
834 .put(index1, key1, data1.clone())
835 .await
836 .expect("Failed to put data");
837
838 archive
840 .put(index2, key2, data2.clone())
841 .await
842 .expect("Failed to put data");
843
844 let retrieved = archive
846 .get(Identifier::Key(key1))
847 .await
848 .expect("Failed to get data")
849 .expect("Data not found");
850 assert_eq!(retrieved, data1);
851
852 let retrieved = archive
854 .get(Identifier::Key(key2))
855 .await
856 .expect("Failed to get data")
857 .expect("Data not found");
858 assert_eq!(retrieved, data2);
859 });
860 }
861
862 #[test_traced]
863 fn test_archive_prune_keys() {
864 let (executor, context, _) = Executor::default();
866 executor.start(async move {
867 let registry = Arc::new(Mutex::new(Registry::default()));
869
870 let journal = Journal::init(
872 context.clone(),
873 JConfig {
874 registry: registry.clone(),
875 partition: "test_partition".into(),
876 },
877 )
878 .await
879 .expect("Failed to initialize journal");
880
881 let cfg = Config {
883 registry: registry.clone(),
884 key_len: 9,
885 translator: FourCap,
886 pending_writes: 10,
887 replay_concurrency: 4,
888 compression: None,
889 section_mask: 0xffff_ffff_ffff_ffffu64, };
891 let mut archive = Archive::init(journal, cfg.clone())
892 .await
893 .expect("Failed to initialize archive");
894
895 let keys = vec![
897 (1u64, "key1-blah", Bytes::from("data1")),
898 (2u64, "key2-blah", Bytes::from("data2")),
899 (3u64, "key3-blah", Bytes::from("data3")),
900 (4u64, "key3-bleh", Bytes::from("data3-again")),
901 (5u64, "key4-blah", Bytes::from("data4")),
902 ];
903
904 for (index, key, data) in &keys {
905 archive
906 .put(*index, key.as_bytes(), data.clone())
907 .await
908 .expect("Failed to put data");
909 }
910
911 let mut buffer = String::new();
913 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
914 assert!(buffer.contains("items_tracked 5"));
915
916 archive.prune(3).await.expect("Failed to prune");
918
919 for (index, key, data) in keys {
921 let retrieved = archive
922 .get(Identifier::Key(key.as_bytes()))
923 .await
924 .expect("Failed to get data");
925 if index < 3 {
926 assert!(retrieved.is_none());
927 } else {
928 assert_eq!(retrieved.expect("Data not found"), data);
929 }
930 }
931
932 let mut buffer = String::new();
934 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
935 assert!(buffer.contains("items_tracked 3"));
936 assert!(buffer.contains("indices_pruned_total 2"));
937 assert!(buffer.contains("keys_pruned_total 0")); archive.prune(2).await.expect("Failed to prune");
941
942 archive.prune(3).await.expect("Failed to prune");
944
945 let result = archive
947 .put(1, "key1-blah".as_bytes(), Bytes::from("data1"))
948 .await;
949 assert!(matches!(result, Err(Error::AlreadyPrunedTo(3))));
950
951 archive
953 .put(6, "key2-blfh".as_bytes(), Bytes::from("data2-2"))
954 .await
955 .expect("Failed to put data");
956
957 let mut buffer = String::new();
959 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
960 assert!(buffer.contains("items_tracked 4")); assert!(buffer.contains("indices_pruned_total 2"));
962 assert!(buffer.contains("keys_pruned_total 1"));
963 });
964 }
965
966 fn test_archive_keys_and_restart(num_keys: usize) -> String {
967 let (executor, mut context, auditor) = Executor::default();
969 executor.start(async move {
970 let registry = Arc::new(Mutex::new(Registry::default()));
972
973 let journal = Journal::init(
975 context.clone(),
976 JConfig {
977 registry: registry.clone(),
978 partition: "test_partition".into(),
979 },
980 )
981 .await
982 .expect("Failed to initialize journal");
983
984 let section_mask = 0xffff_ffff_ffff_ff00u64;
986 let cfg = Config {
987 registry: registry.clone(),
988 key_len: 32,
989 translator: TwoCap,
990 pending_writes: 10,
991 replay_concurrency: 4,
992 compression: None,
993 section_mask,
994 };
995 let mut archive = Archive::init(journal, cfg.clone())
996 .await
997 .expect("Failed to initialize archive");
998
999 let mut keys = BTreeMap::new();
1001 while keys.len() < num_keys {
1002 let index = keys.len() as u64;
1003 let mut key = [0u8; 32];
1004 context.fill(&mut key);
1005 let mut data = [0u8; 1024];
1006 context.fill(&mut data);
1007 let data = Bytes::from(data.to_vec());
1008 archive
1009 .put(index, &key, data.clone())
1010 .await
1011 .expect("Failed to put data");
1012 keys.insert(key, (index, data));
1013 }
1014
1015 for (key, (index, data)) in &keys {
1017 let retrieved = archive
1018 .get(Identifier::Index(*index))
1019 .await
1020 .expect("Failed to get data")
1021 .expect("Data not found");
1022 assert_eq!(retrieved, data);
1023 let retrieved = archive
1024 .get(Identifier::Key(key))
1025 .await
1026 .expect("Failed to get data")
1027 .expect("Data not found");
1028 assert_eq!(retrieved, data);
1029 }
1030
1031 let mut buffer = String::new();
1033 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
1034 let tracked = format!("items_tracked {:?}", num_keys);
1035 assert!(buffer.contains(&tracked));
1036 assert!(buffer.contains("keys_pruned_total 0"));
1037
1038 archive.close().await.expect("Failed to close archive");
1040
1041 let registry = Arc::new(Mutex::new(Registry::default()));
1043 let journal = Journal::init(
1044 context.clone(),
1045 JConfig {
1046 registry: registry.clone(),
1047 partition: "test_partition".into(),
1048 },
1049 )
1050 .await
1051 .expect("Failed to initialize journal");
1052 let cfg = Config {
1053 registry: registry.clone(),
1054 key_len: 32,
1055 translator: TwoCap,
1056 pending_writes: 10,
1057 replay_concurrency: 4,
1058 compression: None,
1059 section_mask,
1060 };
1061 let mut archive = Archive::init(journal, cfg.clone())
1062 .await
1063 .expect("Failed to initialize archive");
1064
1065 for (key, (index, data)) in &keys {
1067 let retrieved = archive
1068 .get(Identifier::Index(*index))
1069 .await
1070 .expect("Failed to get data")
1071 .expect("Data not found");
1072 assert_eq!(retrieved, data);
1073 let retrieved = archive
1074 .get(Identifier::Key(key))
1075 .await
1076 .expect("Failed to get data")
1077 .expect("Data not found");
1078 assert_eq!(retrieved, data);
1079 }
1080
1081 let min = (keys.len() / 2) as u64;
1083 archive.prune(min).await.expect("Failed to prune");
1084
1085 let min = min & section_mask;
1087 let mut removed = 0;
1088 for (key, (index, data)) in keys {
1089 if index >= min {
1090 let retrieved = archive
1091 .get(Identifier::Key(&key))
1092 .await
1093 .expect("Failed to get data")
1094 .expect("Data not found");
1095 assert_eq!(retrieved, data);
1096
1097 let (current_end, start_next) = archive.next_gap(index);
1099 assert_eq!(current_end.unwrap(), num_keys as u64 - 1);
1100 assert!(start_next.is_none());
1101 } else {
1102 let retrieved = archive
1103 .get(Identifier::Key(&key))
1104 .await
1105 .expect("Failed to get data");
1106 assert!(retrieved.is_none());
1107 removed += 1;
1108
1109 let (current_end, start_next) = archive.next_gap(index);
1111 assert!(current_end.is_none());
1112 assert_eq!(start_next.unwrap(), min);
1113 }
1114 }
1115
1116 let mut buffer = String::new();
1118 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
1119 let tracked = format!("items_tracked {:?}", num_keys - removed);
1120 assert!(buffer.contains(&tracked));
1121 let pruned = format!("indices_pruned_total {}", removed);
1122 assert!(buffer.contains(&pruned));
1123 assert!(buffer.contains("keys_pruned_total 0")); });
1125 auditor.state()
1126 }
1127
1128 #[test_traced]
1129 fn test_archive_many_keys_and_restart() {
1130 test_archive_keys_and_restart(100_000); }
1132
1133 #[test_traced]
1134 fn test_determinism() {
1135 let state1 = test_archive_keys_and_restart(5_000); let state2 = test_archive_keys_and_restart(5_000);
1137 assert_eq!(state1, state2);
1138 }
1139
1140 #[test_traced]
1141 fn test_ranges() {
1142 let (executor, context, _) = Executor::default();
1144 executor.start(async move {
1145 let registry = Arc::new(Mutex::new(Registry::default()));
1147
1148 let journal = Journal::init(
1150 context.clone(),
1151 JConfig {
1152 registry: registry.clone(),
1153 partition: "test_partition".into(),
1154 },
1155 )
1156 .await
1157 .expect("Failed to initialize journal");
1158
1159 let cfg = Config {
1161 registry,
1162 key_len: 9,
1163 translator: FourCap,
1164 pending_writes: 10,
1165 replay_concurrency: 4,
1166 compression: None,
1167 section_mask: DEFAULT_SECTION_MASK,
1168 };
1169 let mut archive = Archive::init(journal, cfg.clone())
1170 .await
1171 .expect("Failed to initialize archive");
1172
1173 let keys = vec![
1175 (1u64, "key1-blah", Bytes::from("data1")),
1176 (10u64, "key2-blah", Bytes::from("data2")),
1177 (11u64, "key3-blah", Bytes::from("data3")),
1178 (14u64, "key3-bleh", Bytes::from("data3-again")),
1179 ];
1180 for (index, key, data) in &keys {
1181 archive
1182 .put(*index, key.as_bytes(), data.clone())
1183 .await
1184 .expect("Failed to put data");
1185 }
1186
1187 let (current_end, start_next) = archive.next_gap(0);
1189 assert!(current_end.is_none());
1190 assert_eq!(start_next.unwrap(), 1);
1191
1192 let (current_end, start_next) = archive.next_gap(1);
1193 assert_eq!(current_end.unwrap(), 1);
1194 assert_eq!(start_next.unwrap(), 10);
1195
1196 let (current_end, start_next) = archive.next_gap(10);
1197 assert_eq!(current_end.unwrap(), 11);
1198 assert_eq!(start_next.unwrap(), 14);
1199
1200 let (current_end, start_next) = archive.next_gap(11);
1201 assert_eq!(current_end.unwrap(), 11);
1202 assert_eq!(start_next.unwrap(), 14);
1203
1204 let (current_end, start_next) = archive.next_gap(12);
1205 assert!(current_end.is_none());
1206 assert_eq!(start_next.unwrap(), 14);
1207
1208 let (current_end, start_next) = archive.next_gap(14);
1209 assert_eq!(current_end.unwrap(), 14);
1210 assert!(start_next.is_none());
1211
1212 archive.close().await.expect("Failed to close archive");
1214
1215 let journal = Journal::init(
1216 context,
1217 JConfig {
1218 registry: Arc::new(Mutex::new(Registry::default())),
1219 partition: "test_partition".into(),
1220 },
1221 )
1222 .await
1223 .expect("Failed to initialize journal");
1224 let archive = Archive::init(journal, cfg.clone())
1225 .await
1226 .expect("Failed to initialize archive");
1227
1228 let (current_end, start_next) = archive.next_gap(0);
1230 assert!(current_end.is_none());
1231 assert_eq!(start_next.unwrap(), 1);
1232
1233 let (current_end, start_next) = archive.next_gap(1);
1234 assert_eq!(current_end.unwrap(), 1);
1235 assert_eq!(start_next.unwrap(), 10);
1236
1237 let (current_end, start_next) = archive.next_gap(10);
1238 assert_eq!(current_end.unwrap(), 11);
1239 assert_eq!(start_next.unwrap(), 14);
1240
1241 let (current_end, start_next) = archive.next_gap(11);
1242 assert_eq!(current_end.unwrap(), 11);
1243 assert_eq!(start_next.unwrap(), 14);
1244
1245 let (current_end, start_next) = archive.next_gap(12);
1246 assert!(current_end.is_none());
1247 assert_eq!(start_next.unwrap(), 14);
1248
1249 let (current_end, start_next) = archive.next_gap(14);
1250 assert_eq!(current_end.unwrap(), 14);
1251 assert!(start_next.is_none());
1252 });
1253 }
1254}