1use crate::{
7 journal::{
8 contiguous::{fixed, Contiguous, MutableContiguous},
9 segmented::variable,
10 Error,
11 },
12 Persistable,
13};
14use commonware_codec::{Codec, CodecShared};
15use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
16use commonware_utils::NZUsize;
17#[commonware_macros::stability(ALPHA)]
18use core::ops::Range;
19use futures::{future::Either, stream, Stream, StreamExt as _};
20use std::num::{NonZeroU64, NonZeroUsize};
21#[commonware_macros::stability(ALPHA)]
22use tracing::debug;
23use tracing::warn;
24
25const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1024);
26
27const DATA_SUFFIX: &str = "_data";
29
30const OFFSETS_SUFFIX: &str = "_offsets";
32
33const fn position_to_section(position: u64, items_per_section: u64) -> u64 {
55 position / items_per_section
56}
57
58#[derive(Clone)]
60pub struct Config<C> {
61 pub partition: String,
63
64 pub items_per_section: NonZeroU64,
69
70 pub compression: Option<u8>,
72
73 pub codec_config: C,
75
76 pub page_cache: CacheRef,
78
79 pub write_buffer: NonZeroUsize,
81}
82
83impl<C> Config<C> {
84 fn data_partition(&self) -> String {
86 format!("{}{}", self.partition, DATA_SUFFIX)
87 }
88
89 fn offsets_partition(&self) -> String {
91 format!("{}{}", self.partition, OFFSETS_SUFFIX)
92 }
93}
94
95pub struct Journal<E: Clock + Storage + Metrics, V: Codec> {
133 data: variable::Journal<E, V>,
135
136 offsets: fixed::Journal<E, u64>,
139
140 items_per_section: u64,
147
148 size: u64,
154
155 oldest_retained_pos: u64,
164}
165
166impl<E: Clock + Storage + Metrics, V: CodecShared> Journal<E, V> {
167 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
174 let items_per_section = cfg.items_per_section.get();
175 let data_partition = cfg.data_partition();
176 let offsets_partition = cfg.offsets_partition();
177
178 let mut data = variable::Journal::init(
180 context.with_label("data"),
181 variable::Config {
182 partition: data_partition,
183 compression: cfg.compression,
184 codec_config: cfg.codec_config,
185 page_cache: cfg.page_cache.clone(),
186 write_buffer: cfg.write_buffer,
187 },
188 )
189 .await?;
190
191 let mut offsets = fixed::Journal::init(
193 context.with_label("offsets"),
194 fixed::Config {
195 partition: offsets_partition,
196 items_per_blob: cfg.items_per_section,
197 page_cache: cfg.page_cache,
198 write_buffer: cfg.write_buffer,
199 },
200 )
201 .await?;
202
203 let (oldest_retained_pos, size) =
205 Self::align_journals(&mut data, &mut offsets, items_per_section).await?;
206
207 Ok(Self {
208 data,
209 offsets,
210 items_per_section,
211 size,
212 oldest_retained_pos,
213 })
214 }
215
216 #[commonware_macros::stability(ALPHA)]
221 pub async fn init_at_size(context: E, cfg: Config<V::Cfg>, size: u64) -> Result<Self, Error> {
222 let data = variable::Journal::init(
224 context.with_label("data"),
225 variable::Config {
226 partition: cfg.data_partition(),
227 compression: cfg.compression,
228 codec_config: cfg.codec_config.clone(),
229 page_cache: cfg.page_cache.clone(),
230 write_buffer: cfg.write_buffer,
231 },
232 )
233 .await?;
234
235 let offsets = fixed::Journal::init_at_size(
237 context.with_label("offsets"),
238 fixed::Config {
239 partition: cfg.offsets_partition(),
240 items_per_blob: cfg.items_per_section,
241 page_cache: cfg.page_cache,
242 write_buffer: cfg.write_buffer,
243 },
244 size,
245 )
246 .await?;
247
248 Ok(Self {
249 data,
250 offsets,
251 items_per_section: cfg.items_per_section.get(),
252 size,
253 oldest_retained_pos: size,
254 })
255 }
256
257 #[commonware_macros::stability(ALPHA)]
283 pub(crate) async fn init_sync(
284 context: E,
285 cfg: Config<V::Cfg>,
286 range: Range<u64>,
287 ) -> Result<Self, Error> {
288 assert!(!range.is_empty(), "range must not be empty");
289
290 debug!(
291 range.start,
292 range.end,
293 items_per_section = cfg.items_per_section.get(),
294 "initializing contiguous variable journal for sync"
295 );
296
297 let mut journal = Self::init(context.with_label("journal"), cfg.clone()).await?;
299
300 let size = journal.size();
301
302 if size == 0 {
304 if range.start == 0 {
305 debug!("no existing journal data, returning empty journal");
306 return Ok(journal);
307 } else {
308 debug!(
309 range.start,
310 "no existing journal data, initializing at sync range start"
311 );
312 journal.destroy().await?;
313 return Self::init_at_size(context, cfg, range.start).await;
314 }
315 }
316
317 if size > range.end {
319 return Err(Error::ItemOutOfRange(size));
320 }
321
322 if size <= range.start {
324 debug!(
326 size,
327 range.start, "existing journal data is stale, re-initializing at start position"
328 );
329 journal.destroy().await?;
330 return Self::init_at_size(context, cfg, range.start).await;
331 }
332
333 let bounds = journal.bounds();
335 if !bounds.is_empty() && bounds.start < range.start {
336 debug!(
337 oldest_pos = bounds.start,
338 range.start, "pruning journal to sync range start"
339 );
340 journal.prune(range.start).await?;
341 }
342
343 Ok(journal)
344 }
345
346 pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
359 match size.cmp(&self.size) {
361 std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
362 std::cmp::Ordering::Equal => return Ok(()), std::cmp::Ordering::Less => {}
364 }
365
366 if size < self.oldest_retained_pos {
368 return Err(Error::ItemPruned(size));
369 }
370
371 let discard_offset = self.offsets.read(size).await?;
373 let discard_section = position_to_section(size, self.items_per_section);
374
375 self.data
376 .rewind_to_offset(discard_section, discard_offset)
377 .await?;
378 self.offsets.rewind(size).await?;
379
380 self.size = size;
382
383 Ok(())
384 }
385
386 pub async fn append(&mut self, item: V) -> Result<u64, Error> {
402 let section = self.current_section();
404
405 let (offset, _size) = self.data.append(section, item).await?;
407
408 let offsets_pos = self.offsets.append(offset).await?;
410 assert_eq!(offsets_pos, self.size);
411
412 let position = self.size;
414 self.size += 1;
415
416 if self.size.is_multiple_of(self.items_per_section) {
418 futures::try_join!(self.data.sync(section), self.offsets.sync())?;
419 }
420
421 Ok(position)
422 }
423
424 pub const fn bounds(&self) -> std::ops::Range<u64> {
427 self.oldest_retained_pos..self.size
428 }
429
430 pub const fn size(&self) -> u64 {
435 self.size
436 }
437
438 pub async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
449 if min_position <= self.oldest_retained_pos {
450 return Ok(false);
451 }
452
453 let min_position = min_position.min(self.size);
455
456 let min_section = position_to_section(min_position, self.items_per_section);
458
459 let pruned = self.data.prune(min_section).await?;
460 if pruned {
461 let new_oldest = (min_section * self.items_per_section).max(self.oldest_retained_pos);
462 self.oldest_retained_pos = new_oldest;
463 self.offsets.prune(new_oldest).await?;
464 }
465 Ok(pruned)
466 }
467
468 pub async fn replay(
478 &self,
479 start_pos: u64,
480 buffer_size: NonZeroUsize,
481 ) -> Result<impl Stream<Item = Result<(u64, V), Error>> + '_, Error> {
482 let bounds = self.bounds();
484 if start_pos < bounds.start {
485 return Err(Error::ItemPruned(start_pos));
486 }
487 if start_pos > bounds.end {
488 return Err(Error::ItemOutOfRange(start_pos));
489 }
490
491 if start_pos == bounds.end {
493 return Ok(Either::Left(stream::empty()));
494 }
495
496 let start_offset = self.offsets.read(start_pos).await?;
498 let start_section = position_to_section(start_pos, self.items_per_section);
499 let data_stream = self
500 .data
501 .replay(start_section, start_offset, buffer_size)
502 .await?;
503
504 let transformed = data_stream.enumerate().map(move |(idx, result)| {
506 result.map(|(_section, _offset, _size, item)| {
507 let pos = start_pos + idx as u64;
509 (pos, item)
510 })
511 });
512
513 Ok(Either::Right(transformed))
514 }
515
516 pub async fn read(&self, position: u64) -> Result<V, Error> {
524 let bounds = self.bounds();
526 if position >= bounds.end {
527 return Err(Error::ItemOutOfRange(position));
528 }
529
530 if position < bounds.start {
531 return Err(Error::ItemPruned(position));
532 }
533
534 let offset = self.offsets.read(position).await?;
536 let section = position_to_section(position, self.items_per_section);
537
538 self.data.get(section, offset).await
540 }
541
542 pub async fn commit(&mut self) -> Result<(), Error> {
547 let section = self.current_section();
548 self.data.sync(section).await
549 }
550
551 pub async fn sync(&mut self) -> Result<(), Error> {
555 let section = self.current_section();
558
559 futures::try_join!(self.data.sync(section), self.offsets.sync())?;
561
562 Ok(())
563 }
564
565 pub async fn destroy(self) -> Result<(), Error> {
569 self.data.destroy().await?;
570 self.offsets.destroy().await
571 }
572
573 #[commonware_macros::stability(ALPHA)]
578 pub(crate) async fn clear_to_size(&mut self, new_size: u64) -> Result<(), Error> {
579 self.data.clear().await?;
580
581 self.offsets.clear_to_size(new_size).await?;
582 self.size = new_size;
583 self.oldest_retained_pos = new_size;
584 Ok(())
585 }
586
587 const fn current_section(&self) -> u64 {
589 position_to_section(self.size, self.items_per_section)
590 }
591
592 async fn align_journals(
602 data: &mut variable::Journal<E, V>,
603 offsets: &mut fixed::Journal<E, u64>,
604 items_per_section: u64,
605 ) -> Result<(u64, u64), Error> {
606 let items_in_last_section = match data.newest_section() {
608 Some(last_section) => {
609 let stream = data.replay(last_section, 0, REPLAY_BUFFER_SIZE).await?;
610 futures::pin_mut!(stream);
611 let mut count = 0u64;
612 while let Some(result) = stream.next().await {
613 result?; count += 1;
615 }
616 count
617 }
618 None => 0,
619 };
620
621 let data_empty =
625 data.is_empty() || (data.num_sections() == 1 && items_in_last_section == 0);
626 if data_empty {
627 let size = offsets.size();
628
629 if !data.is_empty() {
630 let data_first_section = data.oldest_section().unwrap();
636 let data_section_start = data_first_section * items_per_section;
637 let target_pos = data_section_start.max(offsets.bounds().start);
638
639 warn!("crash repair: clearing offsets to {target_pos} (empty section crash)");
640 offsets.clear_to_size(target_pos).await?;
641 return Ok((target_pos, target_pos));
642 }
643
644 let offsets_bounds = offsets.bounds();
649 if !offsets_bounds.is_empty() && offsets_bounds.start < size {
650 warn!("crash repair: clearing offsets to {size} (prune-all crash)");
654 offsets.clear_to_size(size).await?;
655 }
656
657 return Ok((size, size));
658 }
659
660 let data_first_section = data.oldest_section().unwrap();
662 let data_last_section = data.newest_section().unwrap();
663
664 let data_oldest_pos = data_first_section * items_per_section;
667
668 let offsets_bounds = offsets.bounds();
671 match (
672 offsets_bounds.is_empty(),
673 offsets_bounds.start.cmp(&data_oldest_pos),
674 ) {
675 (true, _) => {
676 let offsets_first_section = offsets_bounds.start / items_per_section;
679 if offsets_first_section != data_first_section {
680 return Err(Error::Corruption(format!(
681 "offsets journal empty at section {offsets_first_section} != data section {data_first_section}"
682 )));
683 }
684 warn!(
685 "crash repair: offsets journal empty at {}, will rebuild from data",
686 offsets_bounds.start
687 );
688 }
689 (false, std::cmp::Ordering::Less) => {
690 warn!("crash repair: pruning offsets journal to {data_oldest_pos}");
692 offsets.prune(data_oldest_pos).await?;
693 }
694 (false, std::cmp::Ordering::Greater) => {
695 if offsets_bounds.start / items_per_section > data_first_section {
697 return Err(Error::Corruption(format!(
698 "offsets oldest pos ({}) > data oldest pos ({data_oldest_pos})",
699 offsets_bounds.start
700 )));
701 }
702 }
703 (false, std::cmp::Ordering::Equal) => {
704 }
706 }
707
708 let offsets_bounds = offsets.bounds();
716 let data_size = if data_first_section == data_last_section {
717 offsets_bounds.start + items_in_last_section
718 } else {
719 let oldest_items = (data_first_section + 1) * items_per_section - offsets_bounds.start;
720 let middle_items = (data_last_section - data_first_section - 1) * items_per_section;
721 offsets_bounds.start + oldest_items + middle_items + items_in_last_section
722 };
723
724 let offsets_size = offsets_bounds.end;
726 if offsets_size > data_size {
727 warn!("crash repair: rewinding offsets from {offsets_size} to {data_size}");
729 offsets.rewind(data_size).await?;
730 } else if offsets_size < data_size {
731 Self::add_missing_offsets(data, offsets, offsets_size, items_per_section).await?;
733 }
734
735 assert_eq!(offsets.bounds().end, data_size);
737
738 let offsets_bounds = offsets.bounds();
741 assert!(
742 !offsets_bounds.is_empty(),
743 "offsets should have data after alignment"
744 );
745 assert_eq!(
746 offsets_bounds.start / items_per_section,
747 data_first_section,
748 "offsets and data should be in same oldest section"
749 );
750
751 offsets.sync().await?;
752 Ok((offsets_bounds.start, data_size))
753 }
754
755 async fn add_missing_offsets(
766 data: &variable::Journal<E, V>,
767 offsets: &mut fixed::Journal<E, u64>,
768 offsets_size: u64,
769 items_per_section: u64,
770 ) -> Result<(), Error> {
771 assert!(
772 !data.is_empty(),
773 "rebuild_offsets called with empty data journal"
774 );
775
776 let offsets_bounds = offsets.bounds();
778 let (start_section, resume_offset, skip_first) = if offsets_bounds.is_empty() {
779 let first_section = data.oldest_section().unwrap();
782 (first_section, 0, false)
783 } else if offsets_bounds.start < offsets_size {
784 let last_offset = offsets.read(offsets_size - 1).await?;
786 let last_section = position_to_section(offsets_size - 1, items_per_section);
787 (last_section, last_offset, true)
788 } else {
789 let first_section = data.oldest_section().unwrap();
792 (first_section, 0, false)
793 };
794
795 let stream = data
799 .replay(start_section, resume_offset, REPLAY_BUFFER_SIZE)
800 .await?;
801 futures::pin_mut!(stream);
802
803 let mut skipped_first = false;
804 while let Some(result) = stream.next().await {
805 let (_section, offset, _size, _item) = result?;
806
807 if skip_first && !skipped_first {
809 skipped_first = true;
810 continue;
811 }
812
813 offsets.append(offset).await?;
814 }
815
816 Ok(())
817 }
818}
819
820impl<E: Clock + Storage + Metrics, V: CodecShared> Contiguous for Journal<E, V> {
822 type Item = V;
823
824 fn bounds(&self) -> std::ops::Range<u64> {
825 Self::bounds(self)
826 }
827
828 async fn replay(
829 &self,
830 start_pos: u64,
831 buffer: NonZeroUsize,
832 ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error> {
833 Self::replay(self, start_pos, buffer).await
834 }
835
836 async fn read(&self, position: u64) -> Result<Self::Item, Error> {
837 Self::read(self, position).await
838 }
839}
840
841impl<E: Clock + Storage + Metrics, V: CodecShared> MutableContiguous for Journal<E, V> {
842 async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {
843 Self::append(self, item).await
844 }
845
846 async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
847 Self::prune(self, min_position).await
848 }
849
850 async fn rewind(&mut self, size: u64) -> Result<(), Error> {
851 Self::rewind(self, size).await
852 }
853}
854
855impl<E: Clock + Storage + Metrics, V: CodecShared> Persistable for Journal<E, V> {
856 type Error = Error;
857
858 async fn commit(&mut self) -> Result<(), Error> {
859 Self::commit(self).await
860 }
861
862 async fn sync(&mut self) -> Result<(), Error> {
863 Self::sync(self).await
864 }
865
866 async fn destroy(self) -> Result<(), Error> {
867 Self::destroy(self).await
868 }
869}
870#[cfg(test)]
871mod tests {
872 use super::*;
873 use crate::journal::contiguous::tests::run_contiguous_tests;
874 use commonware_macros::test_traced;
875 use commonware_runtime::{buffer::paged::CacheRef, deterministic, Metrics, Runner};
876 use commonware_utils::{NZUsize, NZU16, NZU64};
877 use futures::FutureExt as _;
878 use std::num::NonZeroU16;
879
880 const PAGE_SIZE: NonZeroU16 = NZU16!(101);
882 const PAGE_CACHE_SIZE: usize = 2;
883 const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(1024);
885 const SMALL_PAGE_SIZE: NonZeroU16 = NZU16!(512);
886
887 #[test_traced]
893 fn test_variable_offsets_partition_loss_after_prune_unrecoverable() {
894 let executor = deterministic::Runner::default();
895 executor.start(|context| async move {
896 let cfg = Config {
897 partition: "offsets_loss_after_prune".to_string(),
898 items_per_section: NZU64!(10),
899 compression: None,
900 codec_config: (),
901 page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
902 write_buffer: NZUsize!(1024),
903 };
904
905 let mut journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
907 .await
908 .unwrap();
909
910 for i in 0..40u64 {
912 journal.append(i * 100).await.unwrap();
913 }
914
915 journal.prune(20).await.unwrap();
917 assert_eq!(journal.bounds().start, 20);
918 assert_eq!(journal.bounds().end, 40);
919
920 journal.sync().await.unwrap();
921 drop(journal);
922
923 context
926 .remove(&format!("{}-blobs", cfg.offsets_partition()), None)
927 .await
928 .expect("Failed to remove offsets blobs partition");
929 context
930 .remove(&format!("{}-metadata", cfg.offsets_partition()), None)
931 .await
932 .expect("Failed to remove offsets metadata partition");
933
934 let result = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()).await;
936 assert!(matches!(result, Err(Error::Corruption(_))));
937 });
938 }
939
940 #[test_traced]
948 fn test_variable_align_data_offsets_mismatch() {
949 let executor = deterministic::Runner::default();
950 executor.start(|context| async move {
951 let cfg = Config {
952 partition: "data_loss_test".to_string(),
953 items_per_section: NZU64!(10),
954 compression: None,
955 codec_config: (),
956 page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
957 write_buffer: NZUsize!(1024),
958 };
959
960 let mut variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
962 .await
963 .unwrap();
964
965 for i in 0..20u64 {
967 variable.append(i * 100).await.unwrap();
968 }
969
970 variable.sync().await.unwrap();
971 drop(variable);
972
973 context
975 .remove(&cfg.data_partition(), None)
976 .await
977 .expect("Failed to remove data partition");
978
979 let mut journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
981 .await
982 .expect("Should align offsets to match empty data");
983
984 assert_eq!(journal.size(), 20);
986
987 assert!(journal.bounds().is_empty());
989
990 for i in 0..20 {
992 assert!(matches!(
993 journal.read(i).await,
994 Err(crate::journal::Error::ItemPruned(_))
995 ));
996 }
997
998 let pos = journal.append(999).await.unwrap();
1000 assert_eq!(pos, 20);
1001 assert_eq!(journal.read(20).await.unwrap(), 999);
1002
1003 journal.destroy().await.unwrap();
1004 });
1005 }
1006
1007 #[test_traced]
1008 fn test_variable_contiguous() {
1009 let executor = deterministic::Runner::default();
1010 executor.start(|context| async move {
1011 run_contiguous_tests(move |test_name: String, idx: usize| {
1012 let context = context.with_label(&format!("{test_name}_{idx}"));
1013 async move {
1014 Journal::<_, u64>::init(
1015 context,
1016 Config {
1017 partition: format!("generic_test_{test_name}"),
1018 items_per_section: NZU64!(10),
1019 compression: None,
1020 codec_config: (),
1021 page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1022 write_buffer: NZUsize!(1024),
1023 },
1024 )
1025 .await
1026 }
1027 .boxed()
1028 })
1029 .await;
1030 });
1031 }
1032
1033 #[test_traced]
1035 fn test_variable_multiple_sequential_prunes() {
1036 let executor = deterministic::Runner::default();
1037 executor.start(|context| async move {
1038 let cfg = Config {
1039 partition: "sequential_prunes".to_string(),
1040 items_per_section: NZU64!(10),
1041 compression: None,
1042 codec_config: (),
1043 page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1044 write_buffer: NZUsize!(1024),
1045 };
1046
1047 let mut journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1048
1049 for i in 0..40u64 {
1051 journal.append(i * 100).await.unwrap();
1052 }
1053
1054 assert_eq!(journal.bounds().start, 0);
1056 assert_eq!(journal.bounds().end, 40);
1057
1058 let pruned = journal.prune(10).await.unwrap();
1060 assert!(pruned);
1061
1062 assert_eq!(journal.bounds().start, 10);
1064
1065 assert!(matches!(
1067 journal.read(0).await,
1068 Err(crate::journal::Error::ItemPruned(_))
1069 ));
1070 assert_eq!(journal.read(10).await.unwrap(), 1000);
1071 assert_eq!(journal.read(19).await.unwrap(), 1900);
1072
1073 let pruned = journal.prune(20).await.unwrap();
1075 assert!(pruned);
1076
1077 assert_eq!(journal.bounds().start, 20);
1079
1080 assert!(matches!(
1082 journal.read(10).await,
1083 Err(crate::journal::Error::ItemPruned(_))
1084 ));
1085 assert!(matches!(
1086 journal.read(19).await,
1087 Err(crate::journal::Error::ItemPruned(_))
1088 ));
1089 assert_eq!(journal.read(20).await.unwrap(), 2000);
1090 assert_eq!(journal.read(29).await.unwrap(), 2900);
1091
1092 let pruned = journal.prune(30).await.unwrap();
1094 assert!(pruned);
1095
1096 assert_eq!(journal.bounds().start, 30);
1098
1099 assert!(matches!(
1101 journal.read(20).await,
1102 Err(crate::journal::Error::ItemPruned(_))
1103 ));
1104 assert!(matches!(
1105 journal.read(29).await,
1106 Err(crate::journal::Error::ItemPruned(_))
1107 ));
1108 assert_eq!(journal.read(30).await.unwrap(), 3000);
1109 assert_eq!(journal.read(39).await.unwrap(), 3900);
1110
1111 assert_eq!(journal.size(), 40);
1113
1114 journal.destroy().await.unwrap();
1115 });
1116 }
1117
1118 #[test_traced]
1120 fn test_variable_prune_all_then_reinit() {
1121 let executor = deterministic::Runner::default();
1122 executor.start(|context| async move {
1123 let cfg = Config {
1124 partition: "prune_all_reinit".to_string(),
1125 items_per_section: NZU64!(10),
1126 compression: None,
1127 codec_config: (),
1128 page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1129 write_buffer: NZUsize!(1024),
1130 };
1131
1132 let mut journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1134 .await
1135 .unwrap();
1136
1137 for i in 0..100u64 {
1138 journal.append(i * 100).await.unwrap();
1139 }
1140
1141 assert_eq!(journal.bounds().end, 100);
1142 assert_eq!(journal.bounds().start, 0);
1143
1144 let pruned = journal.prune(100).await.unwrap();
1146 assert!(pruned);
1147
1148 assert_eq!(journal.bounds().end, 100);
1150 assert!(journal.bounds().is_empty());
1151
1152 for i in 0..100 {
1154 assert!(matches!(
1155 journal.read(i).await,
1156 Err(crate::journal::Error::ItemPruned(_))
1157 ));
1158 }
1159
1160 journal.sync().await.unwrap();
1161 drop(journal);
1162
1163 let mut journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1165 .await
1166 .unwrap();
1167
1168 assert_eq!(journal.bounds().end, 100);
1170 assert!(journal.bounds().is_empty());
1171
1172 for i in 0..100 {
1174 assert!(matches!(
1175 journal.read(i).await,
1176 Err(crate::journal::Error::ItemPruned(_))
1177 ));
1178 }
1179
1180 journal.append(10000).await.unwrap();
1183 assert_eq!(journal.bounds().end, 101);
1184 assert_eq!(journal.bounds().start, 100);
1186
1187 assert_eq!(journal.read(100).await.unwrap(), 10000);
1189
1190 assert!(matches!(
1192 journal.read(99).await,
1193 Err(crate::journal::Error::ItemPruned(_))
1194 ));
1195
1196 journal.destroy().await.unwrap();
1197 });
1198 }
1199
1200 #[test_traced]
1202 fn test_variable_recovery_prune_crash_offsets_behind() {
1203 let executor = deterministic::Runner::default();
1204 executor.start(|context| async move {
1205 let cfg = Config {
1207 partition: "recovery_prune_crash".to_string(),
1208 items_per_section: NZU64!(10),
1209 compression: None,
1210 codec_config: (),
1211 page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1212 write_buffer: NZUsize!(1024),
1213 };
1214
1215 let mut variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1216 .await
1217 .unwrap();
1218
1219 for i in 0..40u64 {
1221 variable.append(i * 100).await.unwrap();
1222 }
1223
1224 variable.prune(10).await.unwrap();
1226 assert_eq!(variable.bounds().start, 10);
1227
1228 variable.data.prune(2).await.unwrap();
1231 variable.sync().await.unwrap();
1234 drop(variable);
1235
1236 let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1238 .await
1239 .unwrap();
1240
1241 assert_eq!(variable.bounds().start, 20);
1243 assert_eq!(variable.bounds().end, 40);
1244
1245 assert!(matches!(
1247 variable.read(10).await,
1248 Err(crate::journal::Error::ItemPruned(_))
1249 ));
1250
1251 assert_eq!(variable.read(20).await.unwrap(), 2000);
1253 assert_eq!(variable.read(39).await.unwrap(), 3900);
1254
1255 variable.destroy().await.unwrap();
1256 });
1257 }
1258
1259 #[test_traced]
1264 fn test_variable_recovery_offsets_ahead_corruption() {
1265 let executor = deterministic::Runner::default();
1266 executor.start(|context| async move {
1267 let cfg = Config {
1269 partition: "recovery_offsets_ahead".to_string(),
1270 items_per_section: NZU64!(10),
1271 compression: None,
1272 codec_config: (),
1273 page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1274 write_buffer: NZUsize!(1024),
1275 };
1276
1277 let mut variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1278 .await
1279 .unwrap();
1280
1281 for i in 0..40u64 {
1283 variable.append(i * 100).await.unwrap();
1284 }
1285
1286 variable.offsets.prune(20).await.unwrap(); variable.data.prune(1).await.unwrap(); variable.sync().await.unwrap();
1291 drop(variable);
1292
1293 let result = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()).await;
1295 assert!(matches!(result, Err(Error::Corruption(_))));
1296 });
1297 }
1298
1299 #[test_traced]
1301 fn test_variable_recovery_append_crash_offsets_behind() {
1302 let executor = deterministic::Runner::default();
1303 executor.start(|context| async move {
1304 let cfg = Config {
1306 partition: "recovery_append_crash".to_string(),
1307 items_per_section: NZU64!(10),
1308 compression: None,
1309 codec_config: (),
1310 page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1311 write_buffer: NZUsize!(1024),
1312 };
1313
1314 let mut variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1315 .await
1316 .unwrap();
1317
1318 for i in 0..15u64 {
1320 variable.append(i * 100).await.unwrap();
1321 }
1322
1323 assert_eq!(variable.size(), 15);
1324
1325 for i in 15..20u64 {
1327 variable.data.append(1, i * 100).await.unwrap();
1328 }
1329 variable.sync().await.unwrap();
1332 drop(variable);
1333
1334 let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1336 .await
1337 .unwrap();
1338
1339 assert_eq!(variable.bounds().end, 20);
1341 assert_eq!(variable.bounds().start, 0);
1342
1343 for i in 0..20u64 {
1345 assert_eq!(variable.read(i).await.unwrap(), i * 100);
1346 }
1347
1348 assert_eq!(variable.offsets.size(), 20);
1350
1351 variable.destroy().await.unwrap();
1352 });
1353 }
1354
1355 #[test_traced]
1357 fn test_variable_recovery_multiple_prunes_crash() {
1358 let executor = deterministic::Runner::default();
1359 executor.start(|context| async move {
1360 let cfg = Config {
1362 partition: "recovery_multiple_prunes".to_string(),
1363 items_per_section: NZU64!(10),
1364 compression: None,
1365 codec_config: (),
1366 page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1367 write_buffer: NZUsize!(1024),
1368 };
1369
1370 let mut variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1371 .await
1372 .unwrap();
1373
1374 for i in 0..50u64 {
1376 variable.append(i * 100).await.unwrap();
1377 }
1378
1379 variable.prune(10).await.unwrap();
1381 assert_eq!(variable.bounds().start, 10);
1382
1383 variable.data.prune(3).await.unwrap();
1386 variable.sync().await.unwrap();
1389 drop(variable);
1390
1391 let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1393 .await
1394 .unwrap();
1395
1396 assert_eq!(variable.bounds().start, 30);
1398 assert_eq!(variable.bounds().end, 50);
1399
1400 assert!(matches!(
1402 variable.read(10).await,
1403 Err(crate::journal::Error::ItemPruned(_))
1404 ));
1405 assert!(matches!(
1406 variable.read(20).await,
1407 Err(crate::journal::Error::ItemPruned(_))
1408 ));
1409
1410 assert_eq!(variable.read(30).await.unwrap(), 3000);
1412 assert_eq!(variable.read(49).await.unwrap(), 4900);
1413
1414 variable.destroy().await.unwrap();
1415 });
1416 }
1417
1418 #[test_traced]
1425 fn test_variable_recovery_rewind_crash_multi_section() {
1426 let executor = deterministic::Runner::default();
1427 executor.start(|context| async move {
1428 let cfg = Config {
1430 partition: "recovery_rewind_crash".to_string(),
1431 items_per_section: NZU64!(10),
1432 compression: None,
1433 codec_config: (),
1434 page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1435 write_buffer: NZUsize!(1024),
1436 };
1437
1438 let mut variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1439 .await
1440 .unwrap();
1441
1442 for i in 0..25u64 {
1444 variable.append(i * 100).await.unwrap();
1445 }
1446
1447 assert_eq!(variable.size(), 25);
1448
1449 variable.offsets.rewind(5).await.unwrap();
1452 variable.sync().await.unwrap();
1455 drop(variable);
1456
1457 let mut variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1459 .await
1460 .unwrap();
1461
1462 assert_eq!(variable.bounds().end, 25);
1464 assert_eq!(variable.bounds().start, 0);
1465
1466 for i in 0..25u64 {
1468 assert_eq!(variable.read(i).await.unwrap(), i * 100);
1469 }
1470
1471 assert_eq!(variable.offsets.size(), 25);
1473
1474 let pos = variable.append(2500).await.unwrap();
1476 assert_eq!(pos, 25);
1477 assert_eq!(variable.read(25).await.unwrap(), 2500);
1478
1479 variable.destroy().await.unwrap();
1480 });
1481 }
1482
1483 #[test_traced]
1486 fn test_variable_recovery_empty_offsets_after_prune_and_append() {
1487 let executor = deterministic::Runner::default();
1488 executor.start(|context| async move {
1489 let cfg = Config {
1490 partition: "recovery_empty_after_prune".to_string(),
1491 items_per_section: NZU64!(10),
1492 compression: None,
1493 codec_config: (),
1494 page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1495 write_buffer: NZUsize!(1024),
1496 };
1497
1498 let mut journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1500 .await
1501 .unwrap();
1502
1503 for i in 0..10u64 {
1505 journal.append(i * 100).await.unwrap();
1506 }
1507 assert_eq!(journal.bounds().end, 10);
1508 assert_eq!(journal.bounds().start, 0);
1509
1510 journal.prune(10).await.unwrap();
1512 assert_eq!(journal.bounds().end, 10);
1513 assert!(journal.bounds().is_empty()); for i in 10..20u64 {
1519 journal.data.append(1, i * 100).await.unwrap();
1520 }
1521 journal.data.sync(1).await.unwrap();
1523 drop(journal);
1527
1528 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1530 .await
1531 .expect("Should recover from crash after data sync but before offsets sync");
1532
1533 assert_eq!(journal.bounds().end, 20);
1535 assert_eq!(journal.bounds().start, 10);
1536
1537 for i in 10..20u64 {
1539 assert_eq!(journal.read(i).await.unwrap(), i * 100);
1540 }
1541
1542 for i in 0..10 {
1544 assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1545 }
1546
1547 journal.destroy().await.unwrap();
1548 });
1549 }
1550
1551 #[test_traced]
1553 fn test_variable_concurrent_sync_recovery() {
1554 let executor = deterministic::Runner::default();
1555 executor.start(|context| async move {
1556 let cfg = Config {
1557 partition: "concurrent_sync_recovery".to_string(),
1558 items_per_section: NZU64!(10),
1559 compression: None,
1560 codec_config: (),
1561 page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1562 write_buffer: NZUsize!(1024),
1563 };
1564
1565 let mut journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1566 .await
1567 .unwrap();
1568
1569 for i in 0..15u64 {
1571 journal.append(i * 100).await.unwrap();
1572 }
1573
1574 journal.commit().await.unwrap();
1576
1577 drop(journal);
1579
1580 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1581 .await
1582 .unwrap();
1583
1584 assert_eq!(journal.size(), 15);
1586 for i in 0..15u64 {
1587 assert_eq!(journal.read(i).await.unwrap(), i * 100);
1588 }
1589
1590 journal.destroy().await.unwrap();
1591 });
1592 }
1593
1594 #[test_traced]
1595 fn test_init_at_size_zero() {
1596 let executor = deterministic::Runner::default();
1597 executor.start(|context| async move {
1598 let cfg = Config {
1599 partition: "init_at_size_zero".to_string(),
1600 items_per_section: NZU64!(5),
1601 compression: None,
1602 codec_config: (),
1603 page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1604 write_buffer: NZUsize!(1024),
1605 };
1606
1607 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 0)
1608 .await
1609 .unwrap();
1610
1611 assert_eq!(journal.size(), 0);
1613
1614 assert!(journal.bounds().is_empty());
1616
1617 let pos = journal.append(100).await.unwrap();
1619 assert_eq!(pos, 0);
1620 assert_eq!(journal.size(), 1);
1621 assert_eq!(journal.read(0).await.unwrap(), 100);
1622
1623 journal.destroy().await.unwrap();
1624 });
1625 }
1626
1627 #[test_traced]
1628 fn test_init_at_size_section_boundary() {
1629 let executor = deterministic::Runner::default();
1630 executor.start(|context| async move {
1631 let cfg = Config {
1632 partition: "init_at_size_boundary".to_string(),
1633 items_per_section: NZU64!(5),
1634 compression: None,
1635 codec_config: (),
1636 page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1637 write_buffer: NZUsize!(1024),
1638 };
1639
1640 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 10)
1642 .await
1643 .unwrap();
1644
1645 assert_eq!(journal.bounds().end, 10);
1647
1648 assert!(journal.bounds().is_empty());
1650
1651 let pos = journal.append(1000).await.unwrap();
1653 assert_eq!(pos, 10);
1654 assert_eq!(journal.size(), 11);
1655 assert_eq!(journal.read(10).await.unwrap(), 1000);
1656
1657 let pos = journal.append(1001).await.unwrap();
1659 assert_eq!(pos, 11);
1660 assert_eq!(journal.read(11).await.unwrap(), 1001);
1661
1662 journal.destroy().await.unwrap();
1663 });
1664 }
1665
1666 #[test_traced]
1667 fn test_init_at_size_mid_section() {
1668 let executor = deterministic::Runner::default();
1669 executor.start(|context| async move {
1670 let cfg = Config {
1671 partition: "init_at_size_mid".to_string(),
1672 items_per_section: NZU64!(5),
1673 compression: None,
1674 codec_config: (),
1675 page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1676 write_buffer: NZUsize!(1024),
1677 };
1678
1679 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 7)
1681 .await
1682 .unwrap();
1683
1684 assert_eq!(journal.bounds().end, 7);
1686
1687 assert!(journal.bounds().is_empty());
1689
1690 let pos = journal.append(700).await.unwrap();
1692 assert_eq!(pos, 7);
1693 assert_eq!(journal.size(), 8);
1694 assert_eq!(journal.read(7).await.unwrap(), 700);
1695
1696 journal.destroy().await.unwrap();
1697 });
1698 }
1699
1700 #[test_traced]
1701 fn test_init_at_size_persistence() {
1702 let executor = deterministic::Runner::default();
1703 executor.start(|context| async move {
1704 let cfg = Config {
1705 partition: "init_at_size_persist".to_string(),
1706 items_per_section: NZU64!(5),
1707 compression: None,
1708 codec_config: (),
1709 page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1710 write_buffer: NZUsize!(1024),
1711 };
1712
1713 let mut journal =
1715 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 15)
1716 .await
1717 .unwrap();
1718
1719 for i in 0..5u64 {
1721 let pos = journal.append(1500 + i).await.unwrap();
1722 assert_eq!(pos, 15 + i);
1723 }
1724
1725 assert_eq!(journal.size(), 20);
1726
1727 journal.sync().await.unwrap();
1729 drop(journal);
1730
1731 let mut journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1732 .await
1733 .unwrap();
1734
1735 assert_eq!(journal.bounds().end, 20);
1737 assert_eq!(journal.bounds().start, 15);
1738
1739 for i in 0..5u64 {
1741 assert_eq!(journal.read(15 + i).await.unwrap(), 1500 + i);
1742 }
1743
1744 let pos = journal.append(9999).await.unwrap();
1746 assert_eq!(pos, 20);
1747 assert_eq!(journal.read(20).await.unwrap(), 9999);
1748
1749 journal.destroy().await.unwrap();
1750 });
1751 }
1752
1753 #[test_traced]
1754 fn test_init_at_size_persistence_without_data() {
1755 let executor = deterministic::Runner::default();
1756 executor.start(|context| async move {
1757 let cfg = Config {
1758 partition: "init_at_size_persist_empty".to_string(),
1759 items_per_section: NZU64!(5),
1760 compression: None,
1761 codec_config: (),
1762 page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1763 write_buffer: NZUsize!(1024),
1764 };
1765
1766 let journal =
1768 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 15)
1769 .await
1770 .unwrap();
1771
1772 assert_eq!(journal.bounds().end, 15);
1773 assert!(journal.bounds().is_empty());
1774
1775 drop(journal);
1777
1778 let mut journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1780 .await
1781 .unwrap();
1782
1783 assert_eq!(journal.bounds().end, 15);
1784 assert!(journal.bounds().is_empty());
1785
1786 let pos = journal.append(1500).await.unwrap();
1788 assert_eq!(pos, 15);
1789 assert_eq!(journal.read(15).await.unwrap(), 1500);
1790
1791 journal.destroy().await.unwrap();
1792 });
1793 }
1794
1795 #[test_traced]
1797 fn test_init_at_size_mid_section_persistence() {
1798 let executor = deterministic::Runner::default();
1799 executor.start(|context| async move {
1800 let cfg = Config {
1801 partition: "init_at_size_mid_section".to_string(),
1802 items_per_section: NZU64!(5),
1803 compression: None,
1804 codec_config: (),
1805 page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1806 write_buffer: NZUsize!(1024),
1807 };
1808
1809 let mut journal =
1811 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
1812 .await
1813 .unwrap();
1814
1815 for i in 0..3u64 {
1817 let pos = journal.append(700 + i).await.unwrap();
1818 assert_eq!(pos, 7 + i);
1819 }
1820
1821 assert_eq!(journal.bounds().end, 10);
1822 assert_eq!(journal.bounds().start, 7);
1823
1824 journal.sync().await.unwrap();
1826 drop(journal);
1827
1828 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1830 .await
1831 .unwrap();
1832
1833 assert_eq!(journal.bounds().end, 10);
1835 assert_eq!(journal.bounds().start, 7);
1836
1837 for i in 0..3u64 {
1839 assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
1840 }
1841
1842 assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
1844
1845 journal.destroy().await.unwrap();
1846 });
1847 }
1848
1849 #[test_traced]
1851 fn test_init_at_size_mid_section_multi_section_persistence() {
1852 let executor = deterministic::Runner::default();
1853 executor.start(|context| async move {
1854 let cfg = Config {
1855 partition: "init_at_size_multi_section".to_string(),
1856 items_per_section: NZU64!(5),
1857 compression: None,
1858 codec_config: (),
1859 page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1860 write_buffer: NZUsize!(1024),
1861 };
1862
1863 let mut journal =
1865 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
1866 .await
1867 .unwrap();
1868
1869 for i in 0..8u64 {
1871 let pos = journal.append(700 + i).await.unwrap();
1872 assert_eq!(pos, 7 + i);
1873 }
1874
1875 assert_eq!(journal.bounds().end, 15);
1876 assert_eq!(journal.bounds().start, 7);
1877
1878 journal.sync().await.unwrap();
1880 drop(journal);
1881
1882 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1884 .await
1885 .unwrap();
1886
1887 assert_eq!(journal.bounds().end, 15);
1889 assert_eq!(journal.bounds().start, 7);
1890
1891 for i in 0..8u64 {
1893 assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
1894 }
1895
1896 journal.destroy().await.unwrap();
1897 });
1898 }
1899
1900 #[test_traced]
1902 fn test_align_journals_data_empty_mid_section_pruning_boundary() {
1903 let executor = deterministic::Runner::default();
1904 executor.start(|context| async move {
1905 let cfg = Config {
1906 partition: "align_journals_mid_section_pruning_boundary".to_string(),
1907 items_per_section: NZU64!(5),
1908 compression: None,
1909 codec_config: (),
1910 page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1911 write_buffer: NZUsize!(1024),
1912 };
1913
1914 let mut journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1916 .await
1917 .unwrap();
1918 for i in 0..7u64 {
1919 journal.append(100 + i).await.unwrap();
1920 }
1921 journal.sync().await.unwrap();
1922
1923 journal.data.clear().await.unwrap();
1925 drop(journal);
1926
1927 let mut journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1929 .await
1930 .unwrap();
1931 assert_eq!(journal.bounds().end, 7);
1932 assert!(journal.bounds().is_empty());
1933
1934 let pos = journal.append(777).await.unwrap();
1936 assert_eq!(pos, 7);
1937 assert_eq!(journal.size(), 8);
1938 assert_eq!(journal.read(7).await.unwrap(), 777);
1939
1940 let section = 7 / cfg.items_per_section.get();
1942 journal.data.sync(section).await.unwrap();
1943 drop(journal);
1944
1945 let journal = Journal::<_, u64>::init(context.with_label("third"), cfg.clone())
1947 .await
1948 .unwrap();
1949 assert_eq!(journal.bounds().end, 8);
1950 assert_eq!(journal.bounds().start, 7);
1951 assert_eq!(journal.read(7).await.unwrap(), 777);
1952
1953 journal.destroy().await.unwrap();
1954 });
1955 }
1956
1957 #[test_traced]
1959 fn test_init_at_size_crash_data_synced_offsets_not() {
1960 let executor = deterministic::Runner::default();
1961 executor.start(|context| async move {
1962 let cfg = Config {
1963 partition: "init_at_size_crash_recovery".to_string(),
1964 items_per_section: NZU64!(5),
1965 compression: None,
1966 codec_config: (),
1967 page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1968 write_buffer: NZUsize!(1024),
1969 };
1970
1971 let mut journal =
1973 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
1974 .await
1975 .unwrap();
1976
1977 for i in 0..3u64 {
1979 journal.append(700 + i).await.unwrap();
1980 }
1981
1982 journal.data.sync(1).await.unwrap();
1984 drop(journal);
1986
1987 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1989 .await
1990 .unwrap();
1991
1992 assert_eq!(journal.bounds().end, 10);
1994 assert_eq!(journal.bounds().start, 7);
1995
1996 for i in 0..3u64 {
1998 assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
1999 }
2000
2001 journal.destroy().await.unwrap();
2002 });
2003 }
2004
2005 #[test_traced]
2006 fn test_prune_does_not_move_oldest_retained_backwards() {
2007 let executor = deterministic::Runner::default();
2008 executor.start(|context| async move {
2009 let cfg = Config {
2010 partition: "prune_no_backwards".to_string(),
2011 items_per_section: NZU64!(5),
2012 compression: None,
2013 codec_config: (),
2014 page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
2015 write_buffer: NZUsize!(1024),
2016 };
2017
2018 let mut journal =
2019 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2020 .await
2021 .unwrap();
2022
2023 for i in 0..3u64 {
2025 let pos = journal.append(700 + i).await.unwrap();
2026 assert_eq!(pos, 7 + i);
2027 }
2028 assert_eq!(journal.bounds().start, 7);
2029
2030 journal.prune(8).await.unwrap();
2032 assert_eq!(journal.bounds().start, 7);
2033 assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2034 assert_eq!(journal.read(7).await.unwrap(), 700);
2035
2036 journal.destroy().await.unwrap();
2037 });
2038 }
2039
2040 #[test_traced]
2041 fn test_init_at_size_large_offset() {
2042 let executor = deterministic::Runner::default();
2043 executor.start(|context| async move {
2044 let cfg = Config {
2045 partition: "init_at_size_large".to_string(),
2046 items_per_section: NZU64!(5),
2047 compression: None,
2048 codec_config: (),
2049 page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
2050 write_buffer: NZUsize!(1024),
2051 };
2052
2053 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 1000)
2055 .await
2056 .unwrap();
2057
2058 assert_eq!(journal.bounds().end, 1000);
2059 assert!(journal.bounds().is_empty());
2061
2062 let pos = journal.append(100000).await.unwrap();
2064 assert_eq!(pos, 1000);
2065 assert_eq!(journal.read(1000).await.unwrap(), 100000);
2066
2067 journal.destroy().await.unwrap();
2068 });
2069 }
2070
2071 #[test_traced]
2072 fn test_init_at_size_prune_and_append() {
2073 let executor = deterministic::Runner::default();
2074 executor.start(|context| async move {
2075 let cfg = Config {
2076 partition: "init_at_size_prune".to_string(),
2077 items_per_section: NZU64!(5),
2078 compression: None,
2079 codec_config: (),
2080 page_cache: CacheRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
2081 write_buffer: NZUsize!(1024),
2082 };
2083
2084 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 20)
2086 .await
2087 .unwrap();
2088
2089 for i in 0..10u64 {
2091 journal.append(2000 + i).await.unwrap();
2092 }
2093
2094 assert_eq!(journal.size(), 30);
2095
2096 journal.prune(25).await.unwrap();
2098
2099 assert_eq!(journal.bounds().end, 30);
2100 assert_eq!(journal.bounds().start, 25);
2101
2102 for i in 25..30u64 {
2104 assert_eq!(journal.read(i).await.unwrap(), 2000 + (i - 20));
2105 }
2106
2107 let pos = journal.append(3000).await.unwrap();
2109 assert_eq!(pos, 30);
2110
2111 journal.destroy().await.unwrap();
2112 });
2113 }
2114
2115 #[test_traced]
2117 fn test_init_sync_no_existing_data() {
2118 let executor = deterministic::Runner::default();
2119 executor.start(|context| async move {
2120 let cfg = Config {
2121 partition: "test_fresh_start".into(),
2122 items_per_section: NZU64!(5),
2123 compression: None,
2124 codec_config: (),
2125 write_buffer: NZUsize!(1024),
2126 page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2127 };
2128
2129 let lower_bound = 10;
2131 let upper_bound = 26;
2132 let mut journal =
2133 Journal::init_sync(context.clone(), cfg.clone(), lower_bound..upper_bound)
2134 .await
2135 .expect("Failed to initialize journal with sync boundaries");
2136
2137 assert_eq!(journal.bounds().end, lower_bound);
2138 assert!(journal.bounds().is_empty());
2139
2140 let pos1 = journal.append(42u64).await.unwrap();
2142 assert_eq!(pos1, lower_bound);
2143 assert_eq!(journal.read(pos1).await.unwrap(), 42u64);
2144
2145 let pos2 = journal.append(43u64).await.unwrap();
2146 assert_eq!(pos2, lower_bound + 1);
2147 assert_eq!(journal.read(pos2).await.unwrap(), 43u64);
2148
2149 journal.destroy().await.unwrap();
2150 });
2151 }
2152
2153 #[test_traced]
2155 fn test_init_sync_existing_data_overlap() {
2156 let executor = deterministic::Runner::default();
2157 executor.start(|context| async move {
2158 let cfg = Config {
2159 partition: "test_overlap".into(),
2160 items_per_section: NZU64!(5),
2161 compression: None,
2162 codec_config: (),
2163 write_buffer: NZUsize!(1024),
2164 page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2165 };
2166
2167 let mut journal =
2169 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2170 .await
2171 .expect("Failed to create initial journal");
2172
2173 for i in 0..20u64 {
2175 journal.append(i * 100).await.unwrap();
2176 }
2177 journal.sync().await.unwrap();
2178 drop(journal);
2179
2180 let lower_bound = 8;
2183 let upper_bound = 31;
2184 let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2185 context.clone(),
2186 cfg.clone(),
2187 lower_bound..upper_bound,
2188 )
2189 .await
2190 .expect("Failed to initialize journal with overlap");
2191
2192 assert_eq!(journal.size(), 20);
2193
2194 assert_eq!(journal.bounds().start, 5); assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2199 assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
2200
2201 assert_eq!(journal.read(5).await.unwrap(), 500);
2203 assert_eq!(journal.read(8).await.unwrap(), 800);
2204 assert_eq!(journal.read(19).await.unwrap(), 1900);
2205
2206 assert!(matches!(
2208 journal.read(20).await,
2209 Err(Error::ItemOutOfRange(_))
2210 ));
2211
2212 let pos = journal.append(999).await.unwrap();
2214 assert_eq!(pos, 20);
2215 assert_eq!(journal.read(20).await.unwrap(), 999);
2216
2217 journal.destroy().await.unwrap();
2218 });
2219 }
2220
2221 #[should_panic]
2223 #[test_traced]
2224 fn test_init_sync_invalid_parameters() {
2225 let executor = deterministic::Runner::default();
2226 executor.start(|context| async move {
2227 let cfg = Config {
2228 partition: "test_invalid".into(),
2229 items_per_section: NZU64!(5),
2230 compression: None,
2231 codec_config: (),
2232 write_buffer: NZUsize!(1024),
2233 page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2234 };
2235
2236 #[allow(clippy::reversed_empty_ranges)]
2237 let _result = Journal::<deterministic::Context, u64>::init_sync(
2238 context.clone(),
2239 cfg,
2240 10..5, )
2242 .await;
2243 });
2244 }
2245
2246 #[test_traced]
2248 fn test_init_sync_existing_data_exact_match() {
2249 let executor = deterministic::Runner::default();
2250 executor.start(|context| async move {
2251 let items_per_section = NZU64!(5);
2252 let cfg = Config {
2253 partition: "test_exact_match".to_string(),
2254 items_per_section,
2255 compression: None,
2256 codec_config: (),
2257 write_buffer: NZUsize!(1024),
2258 page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2259 };
2260
2261 let mut journal =
2263 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2264 .await
2265 .expect("Failed to create initial journal");
2266
2267 for i in 0..20u64 {
2269 journal.append(i * 100).await.unwrap();
2270 }
2271 journal.sync().await.unwrap();
2272 drop(journal);
2273
2274 let lower_bound = 5; let upper_bound = 20; let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2278 context.clone(),
2279 cfg.clone(),
2280 lower_bound..upper_bound,
2281 )
2282 .await
2283 .expect("Failed to initialize journal with exact match");
2284
2285 assert_eq!(journal.size(), 20);
2286
2287 assert_eq!(journal.bounds().start, 5); assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2292 assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
2293
2294 assert_eq!(journal.read(5).await.unwrap(), 500);
2296 assert_eq!(journal.read(10).await.unwrap(), 1000);
2297 assert_eq!(journal.read(19).await.unwrap(), 1900);
2298
2299 assert!(matches!(
2301 journal.read(20).await,
2302 Err(Error::ItemOutOfRange(_))
2303 ));
2304
2305 let pos = journal.append(999).await.unwrap();
2307 assert_eq!(pos, 20);
2308 assert_eq!(journal.read(20).await.unwrap(), 999);
2309
2310 journal.destroy().await.unwrap();
2311 });
2312 }
2313
2314 #[test_traced]
2317 fn test_init_sync_existing_data_exceeds_upper_bound() {
2318 let executor = deterministic::Runner::default();
2319 executor.start(|context| async move {
2320 let items_per_section = NZU64!(5);
2321 let cfg = Config {
2322 partition: "test_unexpected_data".into(),
2323 items_per_section,
2324 compression: None,
2325 codec_config: (),
2326 write_buffer: NZUsize!(1024),
2327 page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2328 };
2329
2330 let mut journal = Journal::<deterministic::Context, u64>::init(
2332 context.with_label("initial"),
2333 cfg.clone(),
2334 )
2335 .await
2336 .expect("Failed to create initial journal");
2337
2338 for i in 0..30u64 {
2340 journal.append(i * 1000).await.unwrap();
2341 }
2342 journal.sync().await.unwrap();
2343 drop(journal);
2344
2345 let lower_bound = 8; for (i, upper_bound) in (9..29).enumerate() {
2348 let result = Journal::<deterministic::Context, u64>::init_sync(
2349 context.with_label(&format!("sync_{i}")),
2350 cfg.clone(),
2351 lower_bound..upper_bound,
2352 )
2353 .await;
2354
2355 assert!(matches!(result, Err(Error::ItemOutOfRange(_))));
2357 }
2358 });
2359 }
2360
2361 #[test_traced]
2363 fn test_init_sync_existing_data_stale() {
2364 let executor = deterministic::Runner::default();
2365 executor.start(|context| async move {
2366 let items_per_section = NZU64!(5);
2367 let cfg = Config {
2368 partition: "test_stale".into(),
2369 items_per_section,
2370 compression: None,
2371 codec_config: (),
2372 write_buffer: NZUsize!(1024),
2373 page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2374 };
2375
2376 let mut journal = Journal::<deterministic::Context, u64>::init(
2378 context.with_label("first"),
2379 cfg.clone(),
2380 )
2381 .await
2382 .expect("Failed to create initial journal");
2383
2384 for i in 0..10u64 {
2386 journal.append(i * 100).await.unwrap();
2387 }
2388 journal.sync().await.unwrap();
2389 drop(journal);
2390
2391 let lower_bound = 15; let upper_bound = 26; let journal = Journal::<deterministic::Context, u64>::init_sync(
2395 context.with_label("second"),
2396 cfg.clone(),
2397 lower_bound..upper_bound,
2398 )
2399 .await
2400 .expect("Failed to initialize journal with stale data");
2401
2402 assert_eq!(journal.size(), 15);
2403
2404 assert!(journal.bounds().is_empty());
2406
2407 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2409 assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2410 assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2411
2412 journal.destroy().await.unwrap();
2413 });
2414 }
2415
2416 #[test_traced]
2418 fn test_init_sync_section_boundaries() {
2419 let executor = deterministic::Runner::default();
2420 executor.start(|context| async move {
2421 let items_per_section = NZU64!(5);
2422 let cfg = Config {
2423 partition: "test_boundaries".into(),
2424 items_per_section,
2425 compression: None,
2426 codec_config: (),
2427 write_buffer: NZUsize!(1024),
2428 page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2429 };
2430
2431 let mut journal =
2433 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2434 .await
2435 .expect("Failed to create initial journal");
2436
2437 for i in 0..25u64 {
2439 journal.append(i * 100).await.unwrap();
2440 }
2441 journal.sync().await.unwrap();
2442 drop(journal);
2443
2444 let lower_bound = 15; let upper_bound = 25; let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2448 context.clone(),
2449 cfg.clone(),
2450 lower_bound..upper_bound,
2451 )
2452 .await
2453 .expect("Failed to initialize journal at boundaries");
2454
2455 assert_eq!(journal.size(), 25);
2456
2457 assert_eq!(journal.bounds().start, 15);
2459
2460 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2462 assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2463
2464 assert_eq!(journal.read(15).await.unwrap(), 1500);
2466 assert_eq!(journal.read(20).await.unwrap(), 2000);
2467 assert_eq!(journal.read(24).await.unwrap(), 2400);
2468
2469 assert!(matches!(
2471 journal.read(25).await,
2472 Err(Error::ItemOutOfRange(_))
2473 ));
2474
2475 let pos = journal.append(999).await.unwrap();
2477 assert_eq!(pos, 25);
2478 assert_eq!(journal.read(25).await.unwrap(), 999);
2479
2480 journal.destroy().await.unwrap();
2481 });
2482 }
2483
2484 #[test_traced]
2486 fn test_init_sync_same_section_bounds() {
2487 let executor = deterministic::Runner::default();
2488 executor.start(|context| async move {
2489 let items_per_section = NZU64!(5);
2490 let cfg = Config {
2491 partition: "test_same_section".into(),
2492 items_per_section,
2493 compression: None,
2494 codec_config: (),
2495 write_buffer: NZUsize!(1024),
2496 page_cache: CacheRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2497 };
2498
2499 let mut journal =
2501 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2502 .await
2503 .expect("Failed to create initial journal");
2504
2505 for i in 0..15u64 {
2507 journal.append(i * 100).await.unwrap();
2508 }
2509 journal.sync().await.unwrap();
2510 drop(journal);
2511
2512 let lower_bound = 10; let upper_bound = 15; let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2516 context.clone(),
2517 cfg.clone(),
2518 lower_bound..upper_bound,
2519 )
2520 .await
2521 .expect("Failed to initialize journal with same-section bounds");
2522
2523 assert_eq!(journal.size(), 15);
2524
2525 assert_eq!(journal.bounds().start, 10);
2528
2529 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2531 assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2532
2533 assert_eq!(journal.read(10).await.unwrap(), 1000);
2535 assert_eq!(journal.read(11).await.unwrap(), 1100);
2536 assert_eq!(journal.read(14).await.unwrap(), 1400);
2537
2538 assert!(matches!(
2540 journal.read(15).await,
2541 Err(Error::ItemOutOfRange(_))
2542 ));
2543
2544 let pos = journal.append(999).await.unwrap();
2546 assert_eq!(pos, 15);
2547 assert_eq!(journal.read(15).await.unwrap(), 999);
2548
2549 journal.destroy().await.unwrap();
2550 });
2551 }
2552
2553 #[test_traced]
2558 fn test_single_item_per_section() {
2559 let executor = deterministic::Runner::default();
2560 executor.start(|context| async move {
2561 let cfg = Config {
2562 partition: "single_item_per_section".to_string(),
2563 items_per_section: NZU64!(1),
2564 compression: None,
2565 codec_config: (),
2566 page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
2567 write_buffer: NZUsize!(1024),
2568 };
2569
2570 let mut journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
2572 .await
2573 .unwrap();
2574
2575 assert_eq!(journal.bounds().end, 0);
2577 assert!(journal.bounds().is_empty());
2578
2579 let pos = journal.append(0).await.unwrap();
2581 assert_eq!(pos, 0);
2582 assert_eq!(journal.size(), 1);
2583
2584 journal.sync().await.unwrap();
2586
2587 let value = journal.read(journal.size() - 1).await.unwrap();
2589 assert_eq!(value, 0);
2590
2591 for i in 1..10u64 {
2593 let pos = journal.append(i * 100).await.unwrap();
2594 assert_eq!(pos, i);
2595 assert_eq!(journal.size(), i + 1);
2596
2597 let value = journal.read(journal.size() - 1).await.unwrap();
2599 assert_eq!(value, i * 100);
2600 }
2601
2602 for i in 0..10u64 {
2604 assert_eq!(journal.read(i).await.unwrap(), i * 100);
2605 }
2606
2607 journal.sync().await.unwrap();
2608
2609 let pruned = journal.prune(5).await.unwrap();
2612 assert!(pruned);
2613
2614 assert_eq!(journal.size(), 10);
2616
2617 assert_eq!(journal.bounds().start, 5);
2619
2620 let value = journal.read(journal.size() - 1).await.unwrap();
2622 assert_eq!(value, 900);
2623
2624 for i in 0..5 {
2626 assert!(matches!(
2627 journal.read(i).await,
2628 Err(crate::journal::Error::ItemPruned(_))
2629 ));
2630 }
2631
2632 for i in 5..10u64 {
2634 assert_eq!(journal.read(i).await.unwrap(), i * 100);
2635 }
2636
2637 for i in 10..15u64 {
2639 let pos = journal.append(i * 100).await.unwrap();
2640 assert_eq!(pos, i);
2641
2642 let value = journal.read(journal.size() - 1).await.unwrap();
2644 assert_eq!(value, i * 100);
2645 }
2646
2647 journal.sync().await.unwrap();
2648 drop(journal);
2649
2650 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2652 .await
2653 .unwrap();
2654
2655 assert_eq!(journal.size(), 15);
2657
2658 assert_eq!(journal.bounds().start, 5);
2660
2661 let value = journal.read(journal.size() - 1).await.unwrap();
2663 assert_eq!(value, 1400);
2664
2665 for i in 5..15u64 {
2667 assert_eq!(journal.read(i).await.unwrap(), i * 100);
2668 }
2669
2670 journal.destroy().await.unwrap();
2671
2672 let mut journal = Journal::<_, u64>::init(context.with_label("third"), cfg.clone())
2675 .await
2676 .unwrap();
2677
2678 for i in 0..10u64 {
2680 journal.append(i * 1000).await.unwrap();
2681 }
2682
2683 journal.prune(5).await.unwrap();
2685 assert_eq!(journal.bounds().end, 10);
2686 assert_eq!(journal.bounds().start, 5);
2687
2688 journal.sync().await.unwrap();
2690 drop(journal);
2691
2692 let journal = Journal::<_, u64>::init(context.with_label("fourth"), cfg.clone())
2694 .await
2695 .unwrap();
2696
2697 assert_eq!(journal.bounds().end, 10);
2699 assert_eq!(journal.bounds().start, 5);
2700
2701 let value = journal.read(journal.size() - 1).await.unwrap();
2703 assert_eq!(value, 9000);
2704
2705 for i in 5..10u64 {
2707 assert_eq!(journal.read(i).await.unwrap(), i * 1000);
2708 }
2709
2710 journal.destroy().await.unwrap();
2711
2712 let mut journal = Journal::<_, u64>::init(context.with_label("fifth"), cfg.clone())
2716 .await
2717 .unwrap();
2718
2719 for i in 0..5u64 {
2720 journal.append(i * 100).await.unwrap();
2721 }
2722 journal.sync().await.unwrap();
2723
2724 journal.prune(5).await.unwrap();
2726 assert_eq!(journal.bounds().end, 5); assert!(journal.bounds().is_empty()); let result = journal.read(journal.size() - 1).await;
2731 assert!(matches!(result, Err(crate::journal::Error::ItemPruned(4))));
2732
2733 journal.append(500).await.unwrap();
2735 assert_eq!(journal.bounds().start, 5);
2736 assert_eq!(journal.read(journal.bounds().end - 1).await.unwrap(), 500);
2737
2738 journal.destroy().await.unwrap();
2739 });
2740 }
2741
2742 #[test_traced]
2743 fn test_variable_journal_clear_to_size() {
2744 let executor = deterministic::Runner::default();
2745 executor.start(|context| async move {
2746 let cfg = Config {
2747 partition: "clear_test".to_string(),
2748 items_per_section: NZU64!(10),
2749 compression: None,
2750 codec_config: (),
2751 page_cache: CacheRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
2752 write_buffer: NZUsize!(1024),
2753 };
2754
2755 let mut journal = Journal::<_, u64>::init(context.with_label("journal"), cfg.clone())
2756 .await
2757 .unwrap();
2758
2759 for i in 0..25u64 {
2761 journal.append(i * 100).await.unwrap();
2762 }
2763 assert_eq!(journal.bounds().end, 25);
2764 assert_eq!(journal.bounds().start, 0);
2765 journal.sync().await.unwrap();
2766
2767 journal.clear_to_size(100).await.unwrap();
2769 assert_eq!(journal.bounds().end, 100);
2770 assert!(journal.bounds().is_empty());
2771
2772 for i in 0..25 {
2774 assert!(matches!(
2775 journal.read(i).await,
2776 Err(crate::journal::Error::ItemPruned(_))
2777 ));
2778 }
2779
2780 drop(journal);
2782 let mut journal =
2783 Journal::<_, u64>::init(context.with_label("journal_after_clear"), cfg.clone())
2784 .await
2785 .unwrap();
2786 assert_eq!(journal.bounds().end, 100);
2787 assert!(journal.bounds().is_empty());
2788
2789 for i in 100..105u64 {
2791 let pos = journal.append(i * 100).await.unwrap();
2792 assert_eq!(pos, i);
2793 }
2794 assert_eq!(journal.bounds().end, 105);
2795 assert_eq!(journal.bounds().start, 100);
2796
2797 for i in 100..105u64 {
2799 assert_eq!(journal.read(i).await.unwrap(), i * 100);
2800 }
2801
2802 journal.sync().await.unwrap();
2804 drop(journal);
2805
2806 let journal = Journal::<_, u64>::init(context.with_label("journal_reopened"), cfg)
2807 .await
2808 .unwrap();
2809
2810 assert_eq!(journal.bounds().end, 105);
2811 assert_eq!(journal.bounds().start, 100);
2812 for i in 100..105u64 {
2813 assert_eq!(journal.read(i).await.unwrap(), i * 100);
2814 }
2815
2816 journal.destroy().await.unwrap();
2817 });
2818 }
2819}