1use crate::{
7 journal::{
8 contiguous::{fixed, Contiguous},
9 segmented::variable,
10 Error,
11 },
12 mmr::Location,
13};
14use commonware_codec::Codec;
15use commonware_runtime::{buffer::PoolRef, Metrics, Storage};
16use commonware_utils::NZUsize;
17use core::ops::Range;
18use futures::{stream, Stream, StreamExt as _};
19use std::{
20 num::{NonZeroU64, NonZeroUsize},
21 pin::Pin,
22};
23use tracing::{debug, info};
24
25const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1024);
26
27const DATA_SUFFIX: &str = "_data";
29
30const OFFSETS_SUFFIX: &str = "_offsets";
32
33const fn position_to_section(position: u64, items_per_section: u64) -> u64 {
55 position / items_per_section
56}
57
58#[derive(Clone)]
60pub struct Config<C> {
61 pub partition: String,
63
64 pub items_per_section: NonZeroU64,
69
70 pub compression: Option<u8>,
72
73 pub codec_config: C,
75
76 pub buffer_pool: PoolRef,
78
79 pub write_buffer: NonZeroUsize,
81}
82
83impl<C> Config<C> {
84 fn data_partition(&self) -> String {
86 format!("{}{}", self.partition, DATA_SUFFIX)
87 }
88
89 fn offsets_partition(&self) -> String {
91 format!("{}{}", self.partition, OFFSETS_SUFFIX)
92 }
93}
94
95pub struct Journal<E: Storage + Metrics, V: Codec> {
123 data: variable::Journal<E, V>,
125
126 offsets: fixed::Journal<E, u32>,
129
130 items_per_section: u64,
137
138 size: u64,
144
145 oldest_retained_pos: u64,
152}
153
154impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
155 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
162 let items_per_section = cfg.items_per_section.get();
163 let data_partition = cfg.data_partition();
164 let offsets_partition = cfg.offsets_partition();
165
166 let mut data = variable::Journal::init(
168 context.clone(),
169 variable::Config {
170 partition: data_partition,
171 compression: cfg.compression,
172 codec_config: cfg.codec_config,
173 buffer_pool: cfg.buffer_pool.clone(),
174 write_buffer: cfg.write_buffer,
175 },
176 )
177 .await?;
178
179 let mut offsets = fixed::Journal::init(
181 context,
182 fixed::Config {
183 partition: offsets_partition,
184 items_per_blob: cfg.items_per_section,
185 buffer_pool: cfg.buffer_pool,
186 write_buffer: cfg.write_buffer,
187 },
188 )
189 .await?;
190
191 let (oldest_retained_pos, size) =
193 Self::align_journals(&mut data, &mut offsets, items_per_section).await?;
194 assert!(
195 oldest_retained_pos.is_multiple_of(items_per_section),
196 "oldest_retained_pos is not section-aligned"
197 );
198
199 Ok(Self {
200 data,
201 offsets,
202 items_per_section,
203 size,
204 oldest_retained_pos,
205 })
206 }
207
208 pub async fn init_at_size(context: E, cfg: Config<V::Cfg>, size: u64) -> Result<Self, Error> {
225 let data = variable::Journal::init(
227 context.clone(),
228 variable::Config {
229 partition: cfg.data_partition(),
230 compression: cfg.compression,
231 codec_config: cfg.codec_config.clone(),
232 buffer_pool: cfg.buffer_pool.clone(),
233 write_buffer: cfg.write_buffer,
234 },
235 )
236 .await?;
237
238 let offsets = crate::adb::any::fixed::sync::init_journal_at_size(
240 context,
241 fixed::Config {
242 partition: cfg.offsets_partition(),
243 items_per_blob: cfg.items_per_section,
244 buffer_pool: cfg.buffer_pool,
245 write_buffer: cfg.write_buffer,
246 },
247 size,
248 )
249 .await?;
250
251 Ok(Self {
252 data,
253 offsets,
254 items_per_section: cfg.items_per_section.get(),
255 size,
256 oldest_retained_pos: size,
257 })
258 }
259
260 pub(crate) async fn init_sync(
285 context: E,
286 cfg: Config<V::Cfg>,
287 range: Range<u64>,
288 ) -> Result<Journal<E, V>, crate::adb::Error> {
289 assert!(!range.is_empty(), "range must not be empty");
290
291 debug!(
292 range.start,
293 range.end,
294 items_per_section = cfg.items_per_section.get(),
295 "initializing contiguous variable journal for sync"
296 );
297
298 let mut journal = Journal::init(context.with_label("journal"), cfg.clone()).await?;
300
301 let size = journal.size();
302
303 if size == 0 {
305 if range.start == 0 {
306 debug!("no existing journal data, returning empty journal");
307 return Ok(journal);
308 } else {
309 debug!(
310 range.start,
311 "no existing journal data, initializing at sync range start"
312 );
313 journal.destroy().await?;
314 return Ok(Journal::init_at_size(context, cfg, range.start).await?);
315 }
316 }
317
318 if size > range.end {
320 return Err(crate::adb::Error::UnexpectedData(Location::new_unchecked(
321 size,
322 )));
323 }
324
325 if size <= range.start {
327 debug!(
329 size,
330 range.start, "existing journal data is stale, re-initializing at start position"
331 );
332 journal.destroy().await?;
333 return Ok(Journal::init_at_size(context, cfg, range.start).await?);
334 }
335
336 let oldest = journal.oldest_retained_pos();
338 if let Some(oldest_pos) = oldest {
339 if oldest_pos < range.start {
340 debug!(
341 oldest_pos,
342 range.start, "pruning journal to sync range start"
343 );
344 journal.prune(range.start).await?;
345 }
346 }
347
348 Ok(journal)
349 }
350
351 pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
363 match size.cmp(&self.size) {
365 std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
366 std::cmp::Ordering::Equal => return Ok(()), std::cmp::Ordering::Less => {}
368 }
369
370 if size < self.oldest_retained_pos {
372 return Err(Error::ItemPruned(size));
373 }
374
375 let discard_offset = self.offsets.read(size).await?;
377 let discard_section = position_to_section(size, self.items_per_section);
378
379 self.data
380 .rewind_to_offset(discard_section, discard_offset)
381 .await?;
382 self.offsets.rewind(size).await?;
383
384 self.size = size;
386
387 Ok(())
388 }
389
390 pub async fn append(&mut self, item: V) -> Result<u64, Error> {
406 let section = self.current_section();
408
409 let (offset, _size) = self.data.append(section, item).await?;
411
412 let offsets_pos = self.offsets.append(offset).await?;
414 assert_eq!(offsets_pos, self.size);
415
416 let position = self.size;
418 self.size += 1;
419
420 if self.size.is_multiple_of(self.items_per_section) {
422 futures::try_join!(self.data.sync(section), self.offsets.sync())?;
423 }
424
425 Ok(position)
426 }
427
428 pub fn size(&self) -> u64 {
433 self.size
434 }
435
436 pub fn oldest_retained_pos(&self) -> Option<u64> {
441 if self.size == self.oldest_retained_pos {
442 None
444 } else {
445 Some(self.oldest_retained_pos)
446 }
447 }
448
449 pub async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
460 if min_position <= self.oldest_retained_pos {
461 return Ok(false);
462 }
463
464 let min_position = min_position.min(self.size);
466
467 let min_section = position_to_section(min_position, self.items_per_section);
469
470 let pruned = self.data.prune(min_section).await?;
471 if pruned {
472 self.oldest_retained_pos = min_section * self.items_per_section;
473 self.offsets.prune(self.oldest_retained_pos).await?;
474 }
475 Ok(pruned)
476 }
477
478 pub async fn read(&self, position: u64) -> Result<V, Error> {
486 if position >= self.size {
488 return Err(Error::ItemOutOfRange(position));
489 }
490
491 if position < self.oldest_retained_pos {
492 return Err(Error::ItemPruned(position));
493 }
494
495 let offset = self.offsets.read(position).await?;
497 let section = position_to_section(position, self.items_per_section);
498
499 self.data.get(section, offset).await
501 }
502
503 pub async fn sync_data(&mut self) -> Result<(), Error> {
513 let section = self.current_section();
514 self.data.sync(section).await
515 }
516
517 pub async fn sync(&mut self) -> Result<(), Error> {
521 let section = self.current_section();
524
525 futures::try_join!(self.data.sync(section), self.offsets.sync())?;
527
528 Ok(())
529 }
530
531 pub async fn close(mut self) -> Result<(), Error> {
535 self.sync().await?;
536 self.data.close().await?;
537 self.offsets.close().await
538 }
539
540 pub async fn destroy(self) -> Result<(), Error> {
544 self.data.destroy().await?;
545 self.offsets.destroy().await
546 }
547
548 const fn current_section(&self) -> u64 {
550 position_to_section(self.size, self.items_per_section)
551 }
552
553 async fn align_journals(
563 data: &mut variable::Journal<E, V>,
564 offsets: &mut fixed::Journal<E, u32>,
565 items_per_section: u64,
566 ) -> Result<(u64, u64), Error> {
567 let items_in_last_section = match data.blobs.last_key_value() {
569 Some((last_section, _)) => {
570 let stream = data.replay(*last_section, 0, REPLAY_BUFFER_SIZE).await?;
571 futures::pin_mut!(stream);
572 let mut count = 0u64;
573 while let Some(result) = stream.next().await {
574 result?; count += 1;
576 }
577 count
578 }
579 None => 0,
580 };
581
582 let data_empty =
586 data.blobs.is_empty() || (data.blobs.len() == 1 && items_in_last_section == 0);
587 if data_empty {
588 let size = offsets.size().await;
589
590 if !data.blobs.is_empty() {
591 let first_section = *data.blobs.first_key_value().unwrap().0;
596 let target_pos = first_section * items_per_section;
597
598 info!("crash repair: rewinding offsets from {size} to {target_pos}");
599 offsets.rewind(target_pos).await?;
600 offsets.sync().await?;
601 return Ok((target_pos, target_pos));
602 }
603
604 if let Some(oldest) = offsets.oldest_retained_pos().await? {
609 if oldest < size {
610 info!("crash repair: pruning offsets to {size} (prune-all crash)");
612 offsets.prune(size).await?;
613 offsets.sync().await?;
614 }
615 }
616
617 return Ok((size, size));
618 }
619
620 let (data_oldest_pos, data_size) = {
622 let first_section = *data.blobs.first_key_value().unwrap().0;
624 let last_section = *data.blobs.last_key_value().unwrap().0;
625
626 let oldest_pos = first_section * items_per_section;
627
628 let size = (last_section * items_per_section) + items_in_last_section;
632 (oldest_pos, size)
633 };
634 assert_ne!(
635 data_oldest_pos, data_size,
636 "data journal expected to be non-empty"
637 );
638
639 match offsets.oldest_retained_pos().await? {
642 Some(oldest_retained_pos) if oldest_retained_pos < data_oldest_pos => {
643 info!("crash repair: pruning offsets journal to {data_oldest_pos}");
645 offsets.prune(data_oldest_pos).await?;
646 }
647 Some(oldest_retained_pos) if oldest_retained_pos > data_oldest_pos => {
648 return Err(Error::Corruption(format!(
649 "offsets oldest pos ({oldest_retained_pos}) > data oldest pos ({data_oldest_pos})"
650 )));
651 }
652 Some(_) => {
653 }
655 None if data_oldest_pos > 0 => {
656 let offsets_size = offsets.size().await;
661 if offsets_size != data_oldest_pos {
662 return Err(Error::Corruption(format!(
663 "offsets journal empty: size ({offsets_size}) != data oldest pos ({data_oldest_pos})"
664 )));
665 }
666 info!("crash repair: offsets journal empty at {data_oldest_pos}");
667 }
668 None => {
669 }
671 }
672
673 let offsets_size = offsets.size().await;
674 if offsets_size > data_size {
675 info!("crash repair: rewinding offsets from {offsets_size} to {data_size}");
677 offsets.rewind(data_size).await?;
678 } else if offsets_size < data_size {
679 Self::add_missing_offsets(data, offsets, offsets_size, items_per_section).await?;
682 }
683
684 assert_eq!(offsets.size().await, data_size);
685 assert_eq!(offsets.oldest_retained_pos().await?, Some(data_oldest_pos));
687
688 offsets.sync().await?;
689 Ok((data_oldest_pos, data_size))
690 }
691
692 async fn add_missing_offsets(
703 data: &variable::Journal<E, V>,
704 offsets: &mut fixed::Journal<E, u32>,
705 offsets_size: u64,
706 items_per_section: u64,
707 ) -> Result<(), Error> {
708 assert!(
709 !data.blobs.is_empty(),
710 "rebuild_offsets called with empty data journal"
711 );
712
713 let (start_section, resume_offset, skip_first) =
715 if let Some(oldest) = offsets.oldest_retained_pos().await? {
716 if oldest < offsets_size {
717 let last_offset = offsets.read(offsets_size - 1).await?;
719 let last_section = position_to_section(offsets_size - 1, items_per_section);
720 (last_section, last_offset, true)
721 } else {
722 let first_section = *data.blobs.first_key_value().unwrap().0;
725 (first_section, 0, false)
726 }
727 } else {
728 let first_section = *data.blobs.first_key_value().unwrap().0;
731 (first_section, 0, false)
732 };
733
734 let stream = data
738 .replay(start_section, resume_offset, REPLAY_BUFFER_SIZE)
739 .await?;
740 futures::pin_mut!(stream);
741
742 let mut skipped_first = false;
743 while let Some(result) = stream.next().await {
744 let (_section, offset, _size, _item) = result?;
745
746 if skip_first && !skipped_first {
748 skipped_first = true;
749 continue;
750 }
751
752 offsets.append(offset).await?;
753 }
754
755 Ok(())
756 }
757}
758
759impl<E: Storage + Metrics, V: Codec + Send> Journal<E, V> {
760 pub async fn replay(
770 &self,
771 start_pos: u64,
772 buffer_size: NonZeroUsize,
773 ) -> Result<Pin<Box<dyn Stream<Item = Result<(u64, V), Error>> + '_>>, Error> {
774 if start_pos < self.oldest_retained_pos {
776 return Err(Error::ItemPruned(start_pos));
777 }
778 if start_pos > self.size {
779 return Err(Error::ItemOutOfRange(start_pos));
780 }
781
782 if start_pos == self.size {
784 return Ok(Box::pin(stream::empty()));
785 }
786
787 let start_offset = self.offsets.read(start_pos).await?;
789 let start_section = position_to_section(start_pos, self.items_per_section);
790 let data_stream = self
791 .data
792 .replay(start_section, start_offset, buffer_size)
793 .await?;
794
795 let transformed = data_stream.enumerate().map(move |(idx, result)| {
797 result.map(|(_section, _offset, _size, item)| {
798 let pos = start_pos + idx as u64;
800 (pos, item)
801 })
802 });
803
804 Ok(Box::pin(transformed))
805 }
806}
807
808impl<E: Storage + Metrics, V: Codec + Send> Contiguous for Journal<E, V> {
810 type Item = V;
811
812 async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {
813 Journal::append(self, item).await
814 }
815
816 async fn size(&self) -> u64 {
817 Journal::size(self)
818 }
819
820 async fn oldest_retained_pos(&self) -> Result<Option<u64>, Error> {
821 Ok(Journal::oldest_retained_pos(self))
822 }
823
824 async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
825 Journal::prune(self, min_position).await
826 }
827
828 async fn replay(
829 &self,
830 start_pos: u64,
831 buffer: NonZeroUsize,
832 ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error> {
833 Journal::replay(self, start_pos, buffer).await
834 }
835
836 async fn read(&self, position: u64) -> Result<Self::Item, Error> {
837 Journal::read(self, position).await
838 }
839
840 async fn sync(&mut self) -> Result<(), Error> {
841 Journal::sync(self).await
842 }
843
844 async fn close(self) -> Result<(), Error> {
845 Journal::close(self).await
846 }
847
848 async fn destroy(self) -> Result<(), Error> {
849 Journal::destroy(self).await
850 }
851
852 async fn rewind(&mut self, size: u64) -> Result<(), Error> {
853 Journal::rewind(self, size).await
854 }
855}
856
857#[cfg(test)]
858mod tests {
859 use super::*;
860 use crate::journal::contiguous::tests::run_contiguous_tests;
861 use commonware_macros::test_traced;
862 use commonware_runtime::{buffer::PoolRef, deterministic, Runner};
863 use commonware_utils::{NZUsize, NZU64};
864 use futures::FutureExt as _;
865
866 const PAGE_SIZE: usize = 101;
868 const PAGE_CACHE_SIZE: usize = 2;
869
870 #[test_traced]
876 fn test_variable_offsets_partition_loss_after_prune_unrecoverable() {
877 let executor = deterministic::Runner::default();
878 executor.start(|context| async move {
879 let cfg = Config {
880 partition: "offsets_loss_after_prune".to_string(),
881 items_per_section: NZU64!(10),
882 compression: None,
883 codec_config: (),
884 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
885 write_buffer: NZUsize!(1024),
886 };
887
888 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
890 .await
891 .unwrap();
892
893 for i in 0..40u64 {
895 journal.append(i * 100).await.unwrap();
896 }
897
898 journal.prune(20).await.unwrap();
900 assert_eq!(journal.oldest_retained_pos(), Some(20));
901 assert_eq!(journal.size(), 40);
902
903 journal.close().await.unwrap();
904
905 context
907 .remove(&cfg.offsets_partition(), None)
908 .await
909 .expect("Failed to remove offsets partition");
910
911 let result = Journal::<_, u64>::init(context.clone(), cfg.clone()).await;
913 assert!(matches!(result, Err(Error::Corruption(_))));
914 });
915 }
916
917 #[test_traced]
925 fn test_variable_align_data_offsets_mismatch() {
926 let executor = deterministic::Runner::default();
927 executor.start(|context| async move {
928 let cfg = Config {
929 partition: "data_loss_test".to_string(),
930 items_per_section: NZU64!(10),
931 compression: None,
932 codec_config: (),
933 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
934 write_buffer: NZUsize!(1024),
935 };
936
937 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
939 .await
940 .unwrap();
941
942 for i in 0..20u64 {
944 variable.append(i * 100).await.unwrap();
945 }
946
947 variable.close().await.unwrap();
948
949 context
951 .remove(&cfg.data_partition(), None)
952 .await
953 .expect("Failed to remove data partition");
954
955 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
957 .await
958 .expect("Should align offsets to match empty data");
959
960 assert_eq!(journal.size(), 20);
962
963 assert_eq!(journal.oldest_retained_pos(), None);
965
966 for i in 0..20 {
968 assert!(matches!(
969 journal.read(i).await,
970 Err(crate::journal::Error::ItemPruned(_))
971 ));
972 }
973
974 let pos = journal.append(999).await.unwrap();
976 assert_eq!(pos, 20);
977 assert_eq!(journal.read(20).await.unwrap(), 999);
978
979 journal.destroy().await.unwrap();
980 });
981 }
982
983 #[test_traced]
984 fn test_variable_contiguous() {
985 let executor = deterministic::Runner::default();
986 executor.start(|context| async move {
987 run_contiguous_tests(move |test_name: String| {
988 let context = context.clone();
989 async move {
990 Journal::<_, u64>::init(
991 context,
992 Config {
993 partition: format!("generic_test_{test_name}"),
994 items_per_section: NZU64!(10),
995 compression: None,
996 codec_config: (),
997 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
998 write_buffer: NZUsize!(1024),
999 },
1000 )
1001 .await
1002 }
1003 .boxed()
1004 })
1005 .await;
1006 });
1007 }
1008
1009 #[test_traced]
1011 fn test_variable_multiple_sequential_prunes() {
1012 let executor = deterministic::Runner::default();
1013 executor.start(|context| async move {
1014 let cfg = Config {
1015 partition: "sequential_prunes".to_string(),
1016 items_per_section: NZU64!(10),
1017 compression: None,
1018 codec_config: (),
1019 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1020 write_buffer: NZUsize!(1024),
1021 };
1022
1023 let mut journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1024
1025 for i in 0..40u64 {
1027 journal.append(i * 100).await.unwrap();
1028 }
1029
1030 assert_eq!(journal.oldest_retained_pos(), Some(0));
1032 assert_eq!(journal.size(), 40);
1033
1034 let pruned = journal.prune(10).await.unwrap();
1036 assert!(pruned);
1037
1038 let oldest = journal.oldest_retained_pos().unwrap();
1040 assert_eq!(oldest, 10);
1041
1042 assert!(matches!(
1044 journal.read(0).await,
1045 Err(crate::journal::Error::ItemPruned(_))
1046 ));
1047 assert_eq!(journal.read(10).await.unwrap(), 1000);
1048 assert_eq!(journal.read(19).await.unwrap(), 1900);
1049
1050 let pruned = journal.prune(20).await.unwrap();
1052 assert!(pruned);
1053
1054 let oldest = journal.oldest_retained_pos().unwrap();
1056 assert_eq!(oldest, 20);
1057
1058 assert!(matches!(
1060 journal.read(10).await,
1061 Err(crate::journal::Error::ItemPruned(_))
1062 ));
1063 assert!(matches!(
1064 journal.read(19).await,
1065 Err(crate::journal::Error::ItemPruned(_))
1066 ));
1067 assert_eq!(journal.read(20).await.unwrap(), 2000);
1068 assert_eq!(journal.read(29).await.unwrap(), 2900);
1069
1070 let pruned = journal.prune(30).await.unwrap();
1072 assert!(pruned);
1073
1074 let oldest = journal.oldest_retained_pos().unwrap();
1076 assert_eq!(oldest, 30);
1077
1078 assert!(matches!(
1080 journal.read(20).await,
1081 Err(crate::journal::Error::ItemPruned(_))
1082 ));
1083 assert!(matches!(
1084 journal.read(29).await,
1085 Err(crate::journal::Error::ItemPruned(_))
1086 ));
1087 assert_eq!(journal.read(30).await.unwrap(), 3000);
1088 assert_eq!(journal.read(39).await.unwrap(), 3900);
1089
1090 assert_eq!(journal.size(), 40);
1092
1093 journal.destroy().await.unwrap();
1094 });
1095 }
1096
1097 #[test_traced]
1099 fn test_variable_prune_all_then_reinit() {
1100 let executor = deterministic::Runner::default();
1101 executor.start(|context| async move {
1102 let cfg = Config {
1103 partition: "prune_all_reinit".to_string(),
1104 items_per_section: NZU64!(10),
1105 compression: None,
1106 codec_config: (),
1107 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1108 write_buffer: NZUsize!(1024),
1109 };
1110
1111 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1113 .await
1114 .unwrap();
1115
1116 for i in 0..100u64 {
1117 journal.append(i * 100).await.unwrap();
1118 }
1119
1120 assert_eq!(journal.size(), 100);
1121 assert_eq!(journal.oldest_retained_pos(), Some(0));
1122
1123 let pruned = journal.prune(100).await.unwrap();
1125 assert!(pruned);
1126
1127 assert_eq!(journal.size(), 100);
1129 assert_eq!(journal.oldest_retained_pos(), None);
1130
1131 for i in 0..100 {
1133 assert!(matches!(
1134 journal.read(i).await,
1135 Err(crate::journal::Error::ItemPruned(_))
1136 ));
1137 }
1138
1139 journal.close().await.unwrap();
1140
1141 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1143 .await
1144 .unwrap();
1145
1146 assert_eq!(journal.size(), 100);
1148 assert_eq!(journal.oldest_retained_pos(), None);
1149
1150 for i in 0..100 {
1152 assert!(matches!(
1153 journal.read(i).await,
1154 Err(crate::journal::Error::ItemPruned(_))
1155 ));
1156 }
1157
1158 journal.append(10000).await.unwrap();
1161 assert_eq!(journal.size(), 101);
1162 assert_eq!(journal.oldest_retained_pos(), Some(100));
1164
1165 assert_eq!(journal.read(100).await.unwrap(), 10000);
1167
1168 assert!(matches!(
1170 journal.read(99).await,
1171 Err(crate::journal::Error::ItemPruned(_))
1172 ));
1173
1174 journal.destroy().await.unwrap();
1175 });
1176 }
1177
1178 #[test_traced]
1180 fn test_variable_recovery_prune_crash_offsets_behind() {
1181 let executor = deterministic::Runner::default();
1182 executor.start(|context| async move {
1183 let cfg = Config {
1185 partition: "recovery_prune_crash".to_string(),
1186 items_per_section: NZU64!(10),
1187 compression: None,
1188 codec_config: (),
1189 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1190 write_buffer: NZUsize!(1024),
1191 };
1192
1193 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1194 .await
1195 .unwrap();
1196
1197 for i in 0..40u64 {
1199 variable.append(i * 100).await.unwrap();
1200 }
1201
1202 variable.prune(10).await.unwrap();
1204 assert_eq!(variable.oldest_retained_pos(), Some(10));
1205
1206 variable.data.prune(2).await.unwrap();
1209 variable.close().await.unwrap();
1212
1213 let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1215 .await
1216 .unwrap();
1217
1218 assert_eq!(variable.oldest_retained_pos(), Some(20));
1220 assert_eq!(variable.size(), 40);
1221
1222 assert!(matches!(
1224 variable.read(10).await,
1225 Err(crate::journal::Error::ItemPruned(_))
1226 ));
1227
1228 assert_eq!(variable.read(20).await.unwrap(), 2000);
1230 assert_eq!(variable.read(39).await.unwrap(), 3900);
1231
1232 variable.destroy().await.unwrap();
1233 });
1234 }
1235
1236 #[test_traced]
1241 fn test_variable_recovery_offsets_ahead_corruption() {
1242 let executor = deterministic::Runner::default();
1243 executor.start(|context| async move {
1244 let cfg = Config {
1246 partition: "recovery_offsets_ahead".to_string(),
1247 items_per_section: NZU64!(10),
1248 compression: None,
1249 codec_config: (),
1250 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1251 write_buffer: NZUsize!(1024),
1252 };
1253
1254 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1255 .await
1256 .unwrap();
1257
1258 for i in 0..40u64 {
1260 variable.append(i * 100).await.unwrap();
1261 }
1262
1263 variable.offsets.prune(20).await.unwrap(); variable.data.prune(1).await.unwrap(); variable.close().await.unwrap();
1268
1269 let result = Journal::<_, u64>::init(context.clone(), cfg.clone()).await;
1271 assert!(matches!(result, Err(Error::Corruption(_))));
1272 });
1273 }
1274
1275 #[test_traced]
1277 fn test_variable_recovery_append_crash_offsets_behind() {
1278 let executor = deterministic::Runner::default();
1279 executor.start(|context| async move {
1280 let cfg = Config {
1282 partition: "recovery_append_crash".to_string(),
1283 items_per_section: NZU64!(10),
1284 compression: None,
1285 codec_config: (),
1286 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1287 write_buffer: NZUsize!(1024),
1288 };
1289
1290 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1291 .await
1292 .unwrap();
1293
1294 for i in 0..15u64 {
1296 variable.append(i * 100).await.unwrap();
1297 }
1298
1299 assert_eq!(variable.size(), 15);
1300
1301 for i in 15..20u64 {
1303 variable.data.append(1, i * 100).await.unwrap();
1304 }
1305 variable.close().await.unwrap();
1308
1309 let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1311 .await
1312 .unwrap();
1313
1314 assert_eq!(variable.size(), 20);
1316 assert_eq!(variable.oldest_retained_pos(), Some(0));
1317
1318 for i in 0..20u64 {
1320 assert_eq!(variable.read(i).await.unwrap(), i * 100);
1321 }
1322
1323 assert_eq!(variable.offsets.size().await, 20);
1325
1326 variable.destroy().await.unwrap();
1327 });
1328 }
1329
1330 #[test_traced]
1332 fn test_variable_recovery_multiple_prunes_crash() {
1333 let executor = deterministic::Runner::default();
1334 executor.start(|context| async move {
1335 let cfg = Config {
1337 partition: "recovery_multiple_prunes".to_string(),
1338 items_per_section: NZU64!(10),
1339 compression: None,
1340 codec_config: (),
1341 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1342 write_buffer: NZUsize!(1024),
1343 };
1344
1345 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1346 .await
1347 .unwrap();
1348
1349 for i in 0..50u64 {
1351 variable.append(i * 100).await.unwrap();
1352 }
1353
1354 variable.prune(10).await.unwrap();
1356 assert_eq!(variable.oldest_retained_pos(), Some(10));
1357
1358 variable.data.prune(3).await.unwrap();
1361 variable.close().await.unwrap();
1364
1365 let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1367 .await
1368 .unwrap();
1369
1370 assert_eq!(variable.oldest_retained_pos(), Some(30));
1372 assert_eq!(variable.size(), 50);
1373
1374 assert!(matches!(
1376 variable.read(10).await,
1377 Err(crate::journal::Error::ItemPruned(_))
1378 ));
1379 assert!(matches!(
1380 variable.read(20).await,
1381 Err(crate::journal::Error::ItemPruned(_))
1382 ));
1383
1384 assert_eq!(variable.read(30).await.unwrap(), 3000);
1386 assert_eq!(variable.read(49).await.unwrap(), 4900);
1387
1388 variable.destroy().await.unwrap();
1389 });
1390 }
1391
1392 #[test_traced]
1399 fn test_variable_recovery_rewind_crash_multi_section() {
1400 let executor = deterministic::Runner::default();
1401 executor.start(|context| async move {
1402 let cfg = Config {
1404 partition: "recovery_rewind_crash".to_string(),
1405 items_per_section: NZU64!(10),
1406 compression: None,
1407 codec_config: (),
1408 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1409 write_buffer: NZUsize!(1024),
1410 };
1411
1412 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1413 .await
1414 .unwrap();
1415
1416 for i in 0..25u64 {
1418 variable.append(i * 100).await.unwrap();
1419 }
1420
1421 assert_eq!(variable.size(), 25);
1422
1423 variable.offsets.rewind(5).await.unwrap();
1426 variable.close().await.unwrap();
1429
1430 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1432 .await
1433 .unwrap();
1434
1435 assert_eq!(variable.size(), 25);
1437 assert_eq!(variable.oldest_retained_pos(), Some(0));
1438
1439 for i in 0..25u64 {
1441 assert_eq!(variable.read(i).await.unwrap(), i * 100);
1442 }
1443
1444 assert_eq!(variable.offsets.size().await, 25);
1446
1447 let pos = variable.append(2500).await.unwrap();
1449 assert_eq!(pos, 25);
1450 assert_eq!(variable.read(25).await.unwrap(), 2500);
1451
1452 variable.destroy().await.unwrap();
1453 });
1454 }
1455
1456 #[test_traced]
1459 fn test_variable_recovery_empty_offsets_after_prune_and_append() {
1460 let executor = deterministic::Runner::default();
1461 executor.start(|context| async move {
1462 let cfg = Config {
1463 partition: "recovery_empty_after_prune".to_string(),
1464 items_per_section: NZU64!(10),
1465 compression: None,
1466 codec_config: (),
1467 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1468 write_buffer: NZUsize!(1024),
1469 };
1470
1471 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1473 .await
1474 .unwrap();
1475
1476 for i in 0..10u64 {
1478 journal.append(i * 100).await.unwrap();
1479 }
1480 assert_eq!(journal.size(), 10);
1481 assert_eq!(journal.oldest_retained_pos(), Some(0));
1482
1483 journal.prune(10).await.unwrap();
1485 assert_eq!(journal.size(), 10);
1486 assert_eq!(journal.oldest_retained_pos(), None); for i in 10..20u64 {
1492 journal.data.append(1, i * 100).await.unwrap();
1493 }
1494 journal.data.sync(1).await.unwrap();
1496 drop(journal);
1500
1501 let journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1503 .await
1504 .expect("Should recover from crash after data sync but before offsets sync");
1505
1506 assert_eq!(journal.size(), 20);
1508 assert_eq!(journal.oldest_retained_pos(), Some(10));
1509
1510 for i in 10..20u64 {
1512 assert_eq!(journal.read(i).await.unwrap(), i * 100);
1513 }
1514
1515 for i in 0..10 {
1517 assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1518 }
1519
1520 journal.destroy().await.unwrap();
1521 });
1522 }
1523
1524 #[test_traced]
1526 fn test_variable_concurrent_sync_recovery() {
1527 let executor = deterministic::Runner::default();
1528 executor.start(|context| async move {
1529 let cfg = Config {
1530 partition: "concurrent_sync_recovery".to_string(),
1531 items_per_section: NZU64!(10),
1532 compression: None,
1533 codec_config: (),
1534 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1535 write_buffer: NZUsize!(1024),
1536 };
1537
1538 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1539 .await
1540 .unwrap();
1541
1542 for i in 0..15u64 {
1544 journal.append(i * 100).await.unwrap();
1545 }
1546
1547 journal.sync_data().await.unwrap();
1549
1550 drop(journal);
1552
1553 let journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1554 .await
1555 .unwrap();
1556
1557 assert_eq!(journal.size(), 15);
1559 for i in 0..15u64 {
1560 assert_eq!(journal.read(i).await.unwrap(), i * 100);
1561 }
1562
1563 journal.destroy().await.unwrap();
1564 });
1565 }
1566
1567 #[test_traced]
1568 fn test_init_at_size_zero() {
1569 let executor = deterministic::Runner::default();
1570 executor.start(|context| async move {
1571 let cfg = Config {
1572 partition: "init_at_size_zero".to_string(),
1573 items_per_section: NZU64!(5),
1574 compression: None,
1575 codec_config: (),
1576 buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1577 write_buffer: NZUsize!(1024),
1578 };
1579
1580 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 0)
1581 .await
1582 .unwrap();
1583
1584 assert_eq!(journal.size(), 0);
1586
1587 assert_eq!(journal.oldest_retained_pos(), None);
1589
1590 let pos = journal.append(100).await.unwrap();
1592 assert_eq!(pos, 0);
1593 assert_eq!(journal.size(), 1);
1594 assert_eq!(journal.read(0).await.unwrap(), 100);
1595
1596 journal.destroy().await.unwrap();
1597 });
1598 }
1599
1600 #[test_traced]
1601 fn test_init_at_size_section_boundary() {
1602 let executor = deterministic::Runner::default();
1603 executor.start(|context| async move {
1604 let cfg = Config {
1605 partition: "init_a t_size_boundary".to_string(),
1606 items_per_section: NZU64!(5),
1607 compression: None,
1608 codec_config: (),
1609 buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1610 write_buffer: NZUsize!(1024),
1611 };
1612
1613 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 10)
1615 .await
1616 .unwrap();
1617
1618 assert_eq!(journal.size(), 10);
1620
1621 assert_eq!(journal.oldest_retained_pos(), None);
1623
1624 let pos = journal.append(1000).await.unwrap();
1626 assert_eq!(pos, 10);
1627 assert_eq!(journal.size(), 11);
1628 assert_eq!(journal.read(10).await.unwrap(), 1000);
1629
1630 let pos = journal.append(1001).await.unwrap();
1632 assert_eq!(pos, 11);
1633 assert_eq!(journal.read(11).await.unwrap(), 1001);
1634
1635 journal.destroy().await.unwrap();
1636 });
1637 }
1638
1639 #[test_traced]
1640 fn test_init_at_size_mid_section() {
1641 let executor = deterministic::Runner::default();
1642 executor.start(|context| async move {
1643 let cfg = Config {
1644 partition: "init_at_size_mid".to_string(),
1645 items_per_section: NZU64!(5),
1646 compression: None,
1647 codec_config: (),
1648 buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1649 write_buffer: NZUsize!(1024),
1650 };
1651
1652 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 7)
1654 .await
1655 .unwrap();
1656
1657 assert_eq!(journal.size(), 7);
1659
1660 assert_eq!(journal.oldest_retained_pos(), None);
1662
1663 let pos = journal.append(700).await.unwrap();
1665 assert_eq!(pos, 7);
1666 assert_eq!(journal.size(), 8);
1667 assert_eq!(journal.read(7).await.unwrap(), 700);
1668
1669 journal.destroy().await.unwrap();
1670 });
1671 }
1672
1673 #[test_traced]
1674 fn test_init_at_size_persistence() {
1675 let executor = deterministic::Runner::default();
1676 executor.start(|context| async move {
1677 let cfg = Config {
1678 partition: "init_at_size_persist".to_string(),
1679 items_per_section: NZU64!(5),
1680 compression: None,
1681 codec_config: (),
1682 buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1683 write_buffer: NZUsize!(1024),
1684 };
1685
1686 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 15)
1688 .await
1689 .unwrap();
1690
1691 for i in 0..5u64 {
1693 let pos = journal.append(1500 + i).await.unwrap();
1694 assert_eq!(pos, 15 + i);
1695 }
1696
1697 assert_eq!(journal.size(), 20);
1698
1699 journal.close().await.unwrap();
1701
1702 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1703 .await
1704 .unwrap();
1705
1706 assert_eq!(journal.size(), 20);
1708 assert_eq!(journal.oldest_retained_pos(), Some(15));
1709
1710 for i in 0..5u64 {
1712 assert_eq!(journal.read(15 + i).await.unwrap(), 1500 + i);
1713 }
1714
1715 let pos = journal.append(9999).await.unwrap();
1717 assert_eq!(pos, 20);
1718 assert_eq!(journal.read(20).await.unwrap(), 9999);
1719
1720 journal.destroy().await.unwrap();
1721 });
1722 }
1723
1724 #[test_traced]
1725 fn test_init_at_size_large_offset() {
1726 let executor = deterministic::Runner::default();
1727 executor.start(|context| async move {
1728 let cfg = Config {
1729 partition: "init_at_size_large".to_string(),
1730 items_per_section: NZU64!(5),
1731 compression: None,
1732 codec_config: (),
1733 buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1734 write_buffer: NZUsize!(1024),
1735 };
1736
1737 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 1000)
1739 .await
1740 .unwrap();
1741
1742 assert_eq!(journal.size(), 1000);
1743 assert_eq!(journal.oldest_retained_pos(), None);
1745
1746 let pos = journal.append(100000).await.unwrap();
1748 assert_eq!(pos, 1000);
1749 assert_eq!(journal.read(1000).await.unwrap(), 100000);
1750
1751 journal.destroy().await.unwrap();
1752 });
1753 }
1754
1755 #[test_traced]
1756 fn test_init_at_size_prune_and_append() {
1757 let executor = deterministic::Runner::default();
1758 executor.start(|context| async move {
1759 let cfg = Config {
1760 partition: "init_at_size_prune".to_string(),
1761 items_per_section: NZU64!(5),
1762 compression: None,
1763 codec_config: (),
1764 buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1765 write_buffer: NZUsize!(1024),
1766 };
1767
1768 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 20)
1770 .await
1771 .unwrap();
1772
1773 for i in 0..10u64 {
1775 journal.append(2000 + i).await.unwrap();
1776 }
1777
1778 assert_eq!(journal.size(), 30);
1779
1780 journal.prune(25).await.unwrap();
1782
1783 assert_eq!(journal.size(), 30);
1784 assert_eq!(journal.oldest_retained_pos(), Some(25));
1785
1786 for i in 25..30u64 {
1788 assert_eq!(journal.read(i).await.unwrap(), 2000 + (i - 20));
1789 }
1790
1791 let pos = journal.append(3000).await.unwrap();
1793 assert_eq!(pos, 30);
1794
1795 journal.destroy().await.unwrap();
1796 });
1797 }
1798
1799 #[test_traced]
1801 fn test_init_sync_no_existing_data() {
1802 let executor = deterministic::Runner::default();
1803 executor.start(|context| async move {
1804 let cfg = Config {
1805 partition: "test_fresh_start".into(),
1806 items_per_section: NZU64!(5),
1807 compression: None,
1808 codec_config: (),
1809 write_buffer: NZUsize!(1024),
1810 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1811 };
1812
1813 let lower_bound = 10;
1815 let upper_bound = 26;
1816 let mut journal =
1817 Journal::init_sync(context.clone(), cfg.clone(), lower_bound..upper_bound)
1818 .await
1819 .expect("Failed to initialize journal with sync boundaries");
1820
1821 assert_eq!(journal.size(), lower_bound);
1822 assert_eq!(journal.oldest_retained_pos(), None);
1823
1824 let pos1 = journal.append(42u64).await.unwrap();
1826 assert_eq!(pos1, lower_bound);
1827 assert_eq!(journal.read(pos1).await.unwrap(), 42u64);
1828
1829 let pos2 = journal.append(43u64).await.unwrap();
1830 assert_eq!(pos2, lower_bound + 1);
1831 assert_eq!(journal.read(pos2).await.unwrap(), 43u64);
1832
1833 journal.destroy().await.unwrap();
1834 });
1835 }
1836
1837 #[test_traced]
1839 fn test_init_sync_existing_data_overlap() {
1840 let executor = deterministic::Runner::default();
1841 executor.start(|context| async move {
1842 let cfg = Config {
1843 partition: "test_overlap".into(),
1844 items_per_section: NZU64!(5),
1845 compression: None,
1846 codec_config: (),
1847 write_buffer: NZUsize!(1024),
1848 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1849 };
1850
1851 let mut journal =
1853 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
1854 .await
1855 .expect("Failed to create initial journal");
1856
1857 for i in 0..20u64 {
1859 journal.append(i * 100).await.unwrap();
1860 }
1861 journal.close().await.unwrap();
1862
1863 let lower_bound = 8;
1866 let upper_bound = 31;
1867 let mut journal = Journal::<deterministic::Context, u64>::init_sync(
1868 context.clone(),
1869 cfg.clone(),
1870 lower_bound..upper_bound,
1871 )
1872 .await
1873 .expect("Failed to initialize journal with overlap");
1874
1875 assert_eq!(journal.size(), 20);
1876
1877 let oldest = journal.oldest_retained_pos();
1879 assert_eq!(oldest, Some(5)); assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
1883 assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
1884
1885 assert_eq!(journal.read(5).await.unwrap(), 500);
1887 assert_eq!(journal.read(8).await.unwrap(), 800);
1888 assert_eq!(journal.read(19).await.unwrap(), 1900);
1889
1890 assert!(matches!(
1892 journal.read(20).await,
1893 Err(Error::ItemOutOfRange(_))
1894 ));
1895
1896 let pos = journal.append(999).await.unwrap();
1898 assert_eq!(pos, 20);
1899 assert_eq!(journal.read(20).await.unwrap(), 999);
1900
1901 journal.destroy().await.unwrap();
1902 });
1903 }
1904
1905 #[should_panic]
1907 #[test_traced]
1908 fn test_init_sync_invalid_parameters() {
1909 let executor = deterministic::Runner::default();
1910 executor.start(|context| async move {
1911 let cfg = Config {
1912 partition: "test_invalid".into(),
1913 items_per_section: NZU64!(5),
1914 compression: None,
1915 codec_config: (),
1916 write_buffer: NZUsize!(1024),
1917 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1918 };
1919
1920 #[allow(clippy::reversed_empty_ranges)]
1921 let _result = Journal::<deterministic::Context, u64>::init_sync(
1922 context.clone(),
1923 cfg,
1924 10..5, )
1926 .await;
1927 });
1928 }
1929
1930 #[test_traced]
1932 fn test_init_sync_existing_data_exact_match() {
1933 let executor = deterministic::Runner::default();
1934 executor.start(|context| async move {
1935 let items_per_section = NZU64!(5);
1936 let cfg = Config {
1937 partition: "test_exact_match".to_string(),
1938 items_per_section,
1939 compression: None,
1940 codec_config: (),
1941 write_buffer: NZUsize!(1024),
1942 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1943 };
1944
1945 let mut journal =
1947 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
1948 .await
1949 .expect("Failed to create initial journal");
1950
1951 for i in 0..20u64 {
1953 journal.append(i * 100).await.unwrap();
1954 }
1955 journal.close().await.unwrap();
1956
1957 let lower_bound = 5; let upper_bound = 20; let mut journal = Journal::<deterministic::Context, u64>::init_sync(
1961 context.clone(),
1962 cfg.clone(),
1963 lower_bound..upper_bound,
1964 )
1965 .await
1966 .expect("Failed to initialize journal with exact match");
1967
1968 assert_eq!(journal.size(), 20);
1969
1970 let oldest = journal.oldest_retained_pos();
1972 assert_eq!(oldest, Some(5)); assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
1976 assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
1977
1978 assert_eq!(journal.read(5).await.unwrap(), 500);
1980 assert_eq!(journal.read(10).await.unwrap(), 1000);
1981 assert_eq!(journal.read(19).await.unwrap(), 1900);
1982
1983 assert!(matches!(
1985 journal.read(20).await,
1986 Err(Error::ItemOutOfRange(_))
1987 ));
1988
1989 let pos = journal.append(999).await.unwrap();
1991 assert_eq!(pos, 20);
1992 assert_eq!(journal.read(20).await.unwrap(), 999);
1993
1994 journal.destroy().await.unwrap();
1995 });
1996 }
1997
1998 #[test_traced]
2001 fn test_init_sync_existing_data_exceeds_upper_bound() {
2002 let executor = deterministic::Runner::default();
2003 executor.start(|context| async move {
2004 let items_per_section = NZU64!(5);
2005 let cfg = Config {
2006 partition: "test_unexpected_data".into(),
2007 items_per_section,
2008 compression: None,
2009 codec_config: (),
2010 write_buffer: NZUsize!(1024),
2011 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
2012 };
2013
2014 let mut journal =
2016 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2017 .await
2018 .expect("Failed to create initial journal");
2019
2020 for i in 0..30u64 {
2022 journal.append(i * 1000).await.unwrap();
2023 }
2024 journal.close().await.unwrap();
2025
2026 let lower_bound = 8; for upper_bound in 9..29 {
2029 let result = Journal::<deterministic::Context, u64>::init_sync(
2030 context.clone(),
2031 cfg.clone(),
2032 lower_bound..upper_bound,
2033 )
2034 .await;
2035
2036 assert!(matches!(result, Err(crate::adb::Error::UnexpectedData(_))));
2038 }
2039 });
2040 }
2041
2042 #[test_traced]
2044 fn test_init_sync_existing_data_stale() {
2045 let executor = deterministic::Runner::default();
2046 executor.start(|context| async move {
2047 let items_per_section = NZU64!(5);
2048 let cfg = Config {
2049 partition: "test_stale".into(),
2050 items_per_section,
2051 compression: None,
2052 codec_config: (),
2053 write_buffer: NZUsize!(1024),
2054 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
2055 };
2056
2057 let mut journal =
2059 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2060 .await
2061 .expect("Failed to create initial journal");
2062
2063 for i in 0..10u64 {
2065 journal.append(i * 100).await.unwrap();
2066 }
2067 journal.close().await.unwrap();
2068
2069 let lower_bound = 15; let upper_bound = 26; let journal = Journal::<deterministic::Context, u64>::init_sync(
2073 context.clone(),
2074 cfg.clone(),
2075 lower_bound..upper_bound,
2076 )
2077 .await
2078 .expect("Failed to initialize journal with stale data");
2079
2080 assert_eq!(journal.size(), 15);
2081
2082 assert_eq!(journal.oldest_retained_pos(), None);
2084
2085 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2087 assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2088 assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2089
2090 journal.destroy().await.unwrap();
2091 });
2092 }
2093
2094 #[test_traced]
2096 fn test_init_sync_section_boundaries() {
2097 let executor = deterministic::Runner::default();
2098 executor.start(|context| async move {
2099 let items_per_section = NZU64!(5);
2100 let cfg = Config {
2101 partition: "test_boundaries".into(),
2102 items_per_section,
2103 compression: None,
2104 codec_config: (),
2105 write_buffer: NZUsize!(1024),
2106 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
2107 };
2108
2109 let mut journal =
2111 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2112 .await
2113 .expect("Failed to create initial journal");
2114
2115 for i in 0..25u64 {
2117 journal.append(i * 100).await.unwrap();
2118 }
2119 journal.close().await.unwrap();
2120
2121 let lower_bound = 15; let upper_bound = 25; let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2125 context.clone(),
2126 cfg.clone(),
2127 lower_bound..upper_bound,
2128 )
2129 .await
2130 .expect("Failed to initialize journal at boundaries");
2131
2132 assert_eq!(journal.size(), 25);
2133
2134 let oldest = journal.oldest_retained_pos();
2136 assert_eq!(oldest, Some(15));
2137
2138 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2140 assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2141
2142 assert_eq!(journal.read(15).await.unwrap(), 1500);
2144 assert_eq!(journal.read(20).await.unwrap(), 2000);
2145 assert_eq!(journal.read(24).await.unwrap(), 2400);
2146
2147 assert!(matches!(
2149 journal.read(25).await,
2150 Err(Error::ItemOutOfRange(_))
2151 ));
2152
2153 let pos = journal.append(999).await.unwrap();
2155 assert_eq!(pos, 25);
2156 assert_eq!(journal.read(25).await.unwrap(), 999);
2157
2158 journal.destroy().await.unwrap();
2159 });
2160 }
2161
2162 #[test_traced]
2164 fn test_init_sync_same_section_bounds() {
2165 let executor = deterministic::Runner::default();
2166 executor.start(|context| async move {
2167 let items_per_section = NZU64!(5);
2168 let cfg = Config {
2169 partition: "test_same_section".into(),
2170 items_per_section,
2171 compression: None,
2172 codec_config: (),
2173 write_buffer: NZUsize!(1024),
2174 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
2175 };
2176
2177 let mut journal =
2179 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2180 .await
2181 .expect("Failed to create initial journal");
2182
2183 for i in 0..15u64 {
2185 journal.append(i * 100).await.unwrap();
2186 }
2187 journal.close().await.unwrap();
2188
2189 let lower_bound = 10; let upper_bound = 15; let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2193 context.clone(),
2194 cfg.clone(),
2195 lower_bound..upper_bound,
2196 )
2197 .await
2198 .expect("Failed to initialize journal with same-section bounds");
2199
2200 assert_eq!(journal.size(), 15);
2201
2202 let oldest = journal.oldest_retained_pos();
2205 assert_eq!(oldest, Some(10));
2206
2207 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2209 assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2210
2211 assert_eq!(journal.read(10).await.unwrap(), 1000);
2213 assert_eq!(journal.read(11).await.unwrap(), 1100);
2214 assert_eq!(journal.read(14).await.unwrap(), 1400);
2215
2216 assert!(matches!(
2218 journal.read(15).await,
2219 Err(Error::ItemOutOfRange(_))
2220 ));
2221
2222 let pos = journal.append(999).await.unwrap();
2224 assert_eq!(pos, 15);
2225 assert_eq!(journal.read(15).await.unwrap(), 999);
2226
2227 journal.destroy().await.unwrap();
2228 });
2229 }
2230}