1use crate::{
7 journal::{
8 contiguous::{fixed, Contiguous, MutableContiguous, PersistableContiguous},
9 segmented::variable,
10 Error,
11 },
12 mmr::Location,
13};
14use commonware_codec::Codec;
15use commonware_runtime::{buffer::PoolRef, Metrics, Storage};
16use commonware_utils::NZUsize;
17use core::ops::Range;
18use futures::{future::Either, stream, Stream, StreamExt as _};
19use std::num::{NonZeroU64, NonZeroUsize};
20use tracing::{debug, info};
21
22const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1024);
23
24const DATA_SUFFIX: &str = "_data";
26
27const OFFSETS_SUFFIX: &str = "_offsets";
29
30const fn position_to_section(position: u64, items_per_section: u64) -> u64 {
52 position / items_per_section
53}
54
55#[derive(Clone)]
57pub struct Config<C> {
58 pub partition: String,
60
61 pub items_per_section: NonZeroU64,
66
67 pub compression: Option<u8>,
69
70 pub codec_config: C,
72
73 pub buffer_pool: PoolRef,
75
76 pub write_buffer: NonZeroUsize,
78}
79
80impl<C> Config<C> {
81 fn data_partition(&self) -> String {
83 format!("{}{}", self.partition, DATA_SUFFIX)
84 }
85
86 fn offsets_partition(&self) -> String {
88 format!("{}{}", self.partition, OFFSETS_SUFFIX)
89 }
90}
91
92pub struct Journal<E: Storage + Metrics, V: Codec> {
120 pub(crate) data: variable::Journal<E, V>,
122
123 pub(crate) offsets: fixed::Journal<E, u32>,
126
127 items_per_section: u64,
134
135 size: u64,
141
142 oldest_retained_pos: u64,
149}
150
151impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
152 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
159 let items_per_section = cfg.items_per_section.get();
160 let data_partition = cfg.data_partition();
161 let offsets_partition = cfg.offsets_partition();
162
163 let mut data = variable::Journal::init(
165 context.clone(),
166 variable::Config {
167 partition: data_partition,
168 compression: cfg.compression,
169 codec_config: cfg.codec_config,
170 buffer_pool: cfg.buffer_pool.clone(),
171 write_buffer: cfg.write_buffer,
172 },
173 )
174 .await?;
175
176 let mut offsets = fixed::Journal::init(
178 context,
179 fixed::Config {
180 partition: offsets_partition,
181 items_per_blob: cfg.items_per_section,
182 buffer_pool: cfg.buffer_pool,
183 write_buffer: cfg.write_buffer,
184 },
185 )
186 .await?;
187
188 let (oldest_retained_pos, size) =
190 Self::align_journals(&mut data, &mut offsets, items_per_section).await?;
191 assert!(
192 oldest_retained_pos.is_multiple_of(items_per_section),
193 "oldest_retained_pos is not section-aligned"
194 );
195
196 Ok(Self {
197 data,
198 offsets,
199 items_per_section,
200 size,
201 oldest_retained_pos,
202 })
203 }
204
205 pub async fn init_at_size(context: E, cfg: Config<V::Cfg>, size: u64) -> Result<Self, Error> {
222 let data = variable::Journal::init(
224 context.clone(),
225 variable::Config {
226 partition: cfg.data_partition(),
227 compression: cfg.compression,
228 codec_config: cfg.codec_config.clone(),
229 buffer_pool: cfg.buffer_pool.clone(),
230 write_buffer: cfg.write_buffer,
231 },
232 )
233 .await?;
234
235 let offsets = crate::qmdb::any::unordered::sync::init_journal_at_size(
237 context,
238 fixed::Config {
239 partition: cfg.offsets_partition(),
240 items_per_blob: cfg.items_per_section,
241 buffer_pool: cfg.buffer_pool,
242 write_buffer: cfg.write_buffer,
243 },
244 size,
245 )
246 .await?;
247
248 Ok(Self {
249 data,
250 offsets,
251 items_per_section: cfg.items_per_section.get(),
252 size,
253 oldest_retained_pos: size,
254 })
255 }
256
257 pub(crate) async fn init_sync(
282 context: E,
283 cfg: Config<V::Cfg>,
284 range: Range<u64>,
285 ) -> Result<Self, crate::qmdb::Error> {
286 assert!(!range.is_empty(), "range must not be empty");
287
288 debug!(
289 range.start,
290 range.end,
291 items_per_section = cfg.items_per_section.get(),
292 "initializing contiguous variable journal for sync"
293 );
294
295 let mut journal = Self::init(context.with_label("journal"), cfg.clone()).await?;
297
298 let size = journal.size();
299
300 if size == 0 {
302 if range.start == 0 {
303 debug!("no existing journal data, returning empty journal");
304 return Ok(journal);
305 } else {
306 debug!(
307 range.start,
308 "no existing journal data, initializing at sync range start"
309 );
310 journal.destroy().await?;
311 return Ok(Self::init_at_size(context, cfg, range.start).await?);
312 }
313 }
314
315 if size > range.end {
317 return Err(crate::qmdb::Error::UnexpectedData(Location::new_unchecked(
318 size,
319 )));
320 }
321
322 if size <= range.start {
324 debug!(
326 size,
327 range.start, "existing journal data is stale, re-initializing at start position"
328 );
329 journal.destroy().await?;
330 return Ok(Self::init_at_size(context, cfg, range.start).await?);
331 }
332
333 let oldest = journal.oldest_retained_pos();
335 if let Some(oldest_pos) = oldest {
336 if oldest_pos < range.start {
337 debug!(
338 oldest_pos,
339 range.start, "pruning journal to sync range start"
340 );
341 journal.prune(range.start).await?;
342 }
343 }
344
345 Ok(journal)
346 }
347
348 pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
361 match size.cmp(&self.size) {
363 std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
364 std::cmp::Ordering::Equal => return Ok(()), std::cmp::Ordering::Less => {}
366 }
367
368 if size < self.oldest_retained_pos {
370 return Err(Error::ItemPruned(size));
371 }
372
373 let discard_offset = self.offsets.read(size).await?;
375 let discard_section = position_to_section(size, self.items_per_section);
376
377 self.data
378 .rewind_to_offset(discard_section, discard_offset)
379 .await?;
380 self.offsets.rewind(size).await?;
381
382 self.size = size;
384
385 Ok(())
386 }
387
388 pub async fn append(&mut self, item: V) -> Result<u64, Error> {
404 let section = self.current_section();
406
407 let (offset, _size) = self.data.append(section, item).await?;
409
410 let offsets_pos = self.offsets.append(offset).await?;
412 assert_eq!(offsets_pos, self.size);
413
414 let position = self.size;
416 self.size += 1;
417
418 if self.size.is_multiple_of(self.items_per_section) {
420 futures::try_join!(self.data.sync(section), self.offsets.sync())?;
421 }
422
423 Ok(position)
424 }
425
426 pub const fn size(&self) -> u64 {
431 self.size
432 }
433
434 pub const fn oldest_retained_pos(&self) -> Option<u64> {
438 if self.size == self.oldest_retained_pos {
439 None
441 } else {
442 Some(self.oldest_retained_pos)
443 }
444 }
445
446 pub fn pruning_boundary(&self) -> u64 {
448 self.oldest_retained_pos().unwrap_or(self.size)
449 }
450
451 pub async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
462 if min_position <= self.oldest_retained_pos {
463 return Ok(false);
464 }
465
466 let min_position = min_position.min(self.size);
468
469 let min_section = position_to_section(min_position, self.items_per_section);
471
472 let pruned = self.data.prune(min_section).await?;
473 if pruned {
474 self.oldest_retained_pos = min_section * self.items_per_section;
475 self.offsets.prune(self.oldest_retained_pos).await?;
476 }
477 Ok(pruned)
478 }
479
480 pub async fn replay(
490 &self,
491 start_pos: u64,
492 buffer_size: NonZeroUsize,
493 ) -> Result<impl Stream<Item = Result<(u64, V), Error>> + '_, Error> {
494 if start_pos < self.oldest_retained_pos {
496 return Err(Error::ItemPruned(start_pos));
497 }
498 if start_pos > self.size {
499 return Err(Error::ItemOutOfRange(start_pos));
500 }
501
502 if start_pos == self.size {
504 return Ok(Either::Left(stream::empty()));
505 }
506
507 let start_offset = self.offsets.read(start_pos).await?;
509 let start_section = position_to_section(start_pos, self.items_per_section);
510 let data_stream = self
511 .data
512 .replay(start_section, start_offset, buffer_size)
513 .await?;
514
515 let transformed = data_stream.enumerate().map(move |(idx, result)| {
517 result.map(|(_section, _offset, _size, item)| {
518 let pos = start_pos + idx as u64;
520 (pos, item)
521 })
522 });
523
524 Ok(Either::Right(transformed))
525 }
526
527 pub async fn read(&self, position: u64) -> Result<V, Error> {
535 if position >= self.size {
537 return Err(Error::ItemOutOfRange(position));
538 }
539
540 if position < self.oldest_retained_pos {
541 return Err(Error::ItemPruned(position));
542 }
543
544 let offset = self.offsets.read(position).await?;
546 let section = position_to_section(position, self.items_per_section);
547
548 self.data.get(section, offset).await
550 }
551
552 pub async fn commit(&mut self) -> Result<(), Error> {
557 let section = self.current_section();
558 self.data.sync(section).await
559 }
560
561 pub async fn sync(&mut self) -> Result<(), Error> {
565 let section = self.current_section();
568
569 futures::try_join!(self.data.sync(section), self.offsets.sync())?;
571
572 Ok(())
573 }
574
575 pub async fn close(mut self) -> Result<(), Error> {
579 self.sync().await?;
580 self.data.close().await?;
581 self.offsets.close().await
582 }
583
584 pub async fn destroy(self) -> Result<(), Error> {
588 self.data.destroy().await?;
589 self.offsets.destroy().await
590 }
591
592 const fn current_section(&self) -> u64 {
594 position_to_section(self.size, self.items_per_section)
595 }
596
597 async fn align_journals(
607 data: &mut variable::Journal<E, V>,
608 offsets: &mut fixed::Journal<E, u32>,
609 items_per_section: u64,
610 ) -> Result<(u64, u64), Error> {
611 let items_in_last_section = match data.blobs.last_key_value() {
613 Some((last_section, _)) => {
614 let stream = data.replay(*last_section, 0, REPLAY_BUFFER_SIZE).await?;
615 futures::pin_mut!(stream);
616 let mut count = 0u64;
617 while let Some(result) = stream.next().await {
618 result?; count += 1;
620 }
621 count
622 }
623 None => 0,
624 };
625
626 let data_empty =
630 data.blobs.is_empty() || (data.blobs.len() == 1 && items_in_last_section == 0);
631 if data_empty {
632 let size = offsets.size();
633
634 if !data.blobs.is_empty() {
635 let first_section = *data.blobs.first_key_value().unwrap().0;
640 let target_pos = first_section * items_per_section;
641
642 info!("crash repair: rewinding offsets from {size} to {target_pos}");
643 offsets.rewind(target_pos).await?;
644 offsets.sync().await?;
645 return Ok((target_pos, target_pos));
646 }
647
648 if let Some(oldest) = offsets.oldest_retained_pos() {
653 if oldest < size {
654 info!("crash repair: pruning offsets to {size} (prune-all crash)");
656 offsets.prune(size).await?;
657 offsets.sync().await?;
658 }
659 }
660
661 return Ok((size, size));
662 }
663
664 let (data_oldest_pos, data_size) = {
666 let first_section = *data.blobs.first_key_value().unwrap().0;
668 let last_section = *data.blobs.last_key_value().unwrap().0;
669
670 let oldest_pos = first_section * items_per_section;
671
672 let size = (last_section * items_per_section) + items_in_last_section;
676 (oldest_pos, size)
677 };
678 assert_ne!(
679 data_oldest_pos, data_size,
680 "data journal expected to be non-empty"
681 );
682
683 match offsets.oldest_retained_pos() {
686 Some(oldest_retained_pos) if oldest_retained_pos < data_oldest_pos => {
687 info!("crash repair: pruning offsets journal to {data_oldest_pos}");
689 offsets.prune(data_oldest_pos).await?;
690 }
691 Some(oldest_retained_pos) if oldest_retained_pos > data_oldest_pos => {
692 return Err(Error::Corruption(format!(
693 "offsets oldest pos ({oldest_retained_pos}) > data oldest pos ({data_oldest_pos})"
694 )));
695 }
696 Some(_) => {
697 }
699 None if data_oldest_pos > 0 => {
700 let offsets_size = offsets.size();
705 if offsets_size != data_oldest_pos {
706 return Err(Error::Corruption(format!(
707 "offsets journal empty: size ({offsets_size}) != data oldest pos ({data_oldest_pos})"
708 )));
709 }
710 info!("crash repair: offsets journal empty at {data_oldest_pos}");
711 }
712 None => {
713 }
715 }
716
717 let offsets_size = offsets.size();
718 if offsets_size > data_size {
719 info!("crash repair: rewinding offsets from {offsets_size} to {data_size}");
721 offsets.rewind(data_size).await?;
722 } else if offsets_size < data_size {
723 Self::add_missing_offsets(data, offsets, offsets_size, items_per_section).await?;
726 }
727
728 assert_eq!(offsets.size(), data_size);
729 assert_eq!(offsets.oldest_retained_pos(), Some(data_oldest_pos));
731
732 offsets.sync().await?;
733 Ok((data_oldest_pos, data_size))
734 }
735
736 async fn add_missing_offsets(
747 data: &variable::Journal<E, V>,
748 offsets: &mut fixed::Journal<E, u32>,
749 offsets_size: u64,
750 items_per_section: u64,
751 ) -> Result<(), Error> {
752 assert!(
753 !data.blobs.is_empty(),
754 "rebuild_offsets called with empty data journal"
755 );
756
757 let (start_section, resume_offset, skip_first) =
759 if let Some(oldest) = offsets.oldest_retained_pos() {
760 if oldest < offsets_size {
761 let last_offset = offsets.read(offsets_size - 1).await?;
763 let last_section = position_to_section(offsets_size - 1, items_per_section);
764 (last_section, last_offset, true)
765 } else {
766 let first_section = *data.blobs.first_key_value().unwrap().0;
769 (first_section, 0, false)
770 }
771 } else {
772 let first_section = *data.blobs.first_key_value().unwrap().0;
775 (first_section, 0, false)
776 };
777
778 let stream = data
782 .replay(start_section, resume_offset, REPLAY_BUFFER_SIZE)
783 .await?;
784 futures::pin_mut!(stream);
785
786 let mut skipped_first = false;
787 while let Some(result) = stream.next().await {
788 let (_section, offset, _size, _item) = result?;
789
790 if skip_first && !skipped_first {
792 skipped_first = true;
793 continue;
794 }
795
796 offsets.append(offset).await?;
797 }
798
799 Ok(())
800 }
801}
802
803impl<E: Storage + Metrics, V: Codec> Contiguous for Journal<E, V> {
805 type Item = V;
806
807 fn size(&self) -> u64 {
808 Self::size(self)
809 }
810
811 fn oldest_retained_pos(&self) -> Option<u64> {
812 Self::oldest_retained_pos(self)
813 }
814
815 fn pruning_boundary(&self) -> u64 {
816 Self::pruning_boundary(self)
817 }
818
819 async fn replay(
820 &self,
821 start_pos: u64,
822 buffer: NonZeroUsize,
823 ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error> {
824 Self::replay(self, start_pos, buffer).await
825 }
826
827 async fn read(&self, position: u64) -> Result<Self::Item, Error> {
828 Self::read(self, position).await
829 }
830}
831
832impl<E: Storage + Metrics, V: Codec> MutableContiguous for Journal<E, V> {
833 async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {
834 Self::append(self, item).await
835 }
836
837 async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
838 Self::prune(self, min_position).await
839 }
840
841 async fn rewind(&mut self, size: u64) -> Result<(), Error> {
842 Self::rewind(self, size).await
843 }
844}
845
846impl<E: Storage + Metrics, V: Codec> PersistableContiguous for Journal<E, V> {
847 async fn commit(&mut self) -> Result<(), Error> {
848 Self::commit(self).await
849 }
850
851 async fn sync(&mut self) -> Result<(), Error> {
852 Self::sync(self).await
853 }
854
855 async fn close(self) -> Result<(), Error> {
856 Self::close(self).await
857 }
858
859 async fn destroy(self) -> Result<(), Error> {
860 Self::destroy(self).await
861 }
862}
863#[cfg(test)]
864mod tests {
865 use super::*;
866 use crate::journal::contiguous::tests::run_contiguous_tests;
867 use commonware_macros::test_traced;
868 use commonware_runtime::{buffer::PoolRef, deterministic, Runner};
869 use commonware_utils::{NZUsize, NZU64};
870 use futures::FutureExt as _;
871
872 const PAGE_SIZE: usize = 101;
874 const PAGE_CACHE_SIZE: usize = 2;
875
876 #[test_traced]
882 fn test_variable_offsets_partition_loss_after_prune_unrecoverable() {
883 let executor = deterministic::Runner::default();
884 executor.start(|context| async move {
885 let cfg = Config {
886 partition: "offsets_loss_after_prune".to_string(),
887 items_per_section: NZU64!(10),
888 compression: None,
889 codec_config: (),
890 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
891 write_buffer: NZUsize!(1024),
892 };
893
894 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
896 .await
897 .unwrap();
898
899 for i in 0..40u64 {
901 journal.append(i * 100).await.unwrap();
902 }
903
904 journal.prune(20).await.unwrap();
906 assert_eq!(journal.oldest_retained_pos(), Some(20));
907 assert_eq!(journal.size(), 40);
908
909 journal.close().await.unwrap();
910
911 context
913 .remove(&cfg.offsets_partition(), None)
914 .await
915 .expect("Failed to remove offsets partition");
916
917 let result = Journal::<_, u64>::init(context.clone(), cfg.clone()).await;
919 assert!(matches!(result, Err(Error::Corruption(_))));
920 });
921 }
922
923 #[test_traced]
931 fn test_variable_align_data_offsets_mismatch() {
932 let executor = deterministic::Runner::default();
933 executor.start(|context| async move {
934 let cfg = Config {
935 partition: "data_loss_test".to_string(),
936 items_per_section: NZU64!(10),
937 compression: None,
938 codec_config: (),
939 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
940 write_buffer: NZUsize!(1024),
941 };
942
943 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
945 .await
946 .unwrap();
947
948 for i in 0..20u64 {
950 variable.append(i * 100).await.unwrap();
951 }
952
953 variable.close().await.unwrap();
954
955 context
957 .remove(&cfg.data_partition(), None)
958 .await
959 .expect("Failed to remove data partition");
960
961 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
963 .await
964 .expect("Should align offsets to match empty data");
965
966 assert_eq!(journal.size(), 20);
968
969 assert_eq!(journal.oldest_retained_pos(), None);
971
972 for i in 0..20 {
974 assert!(matches!(
975 journal.read(i).await,
976 Err(crate::journal::Error::ItemPruned(_))
977 ));
978 }
979
980 let pos = journal.append(999).await.unwrap();
982 assert_eq!(pos, 20);
983 assert_eq!(journal.read(20).await.unwrap(), 999);
984
985 journal.destroy().await.unwrap();
986 });
987 }
988
989 #[test_traced]
990 fn test_variable_contiguous() {
991 let executor = deterministic::Runner::default();
992 executor.start(|context| async move {
993 run_contiguous_tests(move |test_name: String| {
994 let context = context.clone();
995 async move {
996 Journal::<_, u64>::init(
997 context,
998 Config {
999 partition: format!("generic_test_{test_name}"),
1000 items_per_section: NZU64!(10),
1001 compression: None,
1002 codec_config: (),
1003 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1004 write_buffer: NZUsize!(1024),
1005 },
1006 )
1007 .await
1008 }
1009 .boxed()
1010 })
1011 .await;
1012 });
1013 }
1014
1015 #[test_traced]
1017 fn test_variable_multiple_sequential_prunes() {
1018 let executor = deterministic::Runner::default();
1019 executor.start(|context| async move {
1020 let cfg = Config {
1021 partition: "sequential_prunes".to_string(),
1022 items_per_section: NZU64!(10),
1023 compression: None,
1024 codec_config: (),
1025 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1026 write_buffer: NZUsize!(1024),
1027 };
1028
1029 let mut journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1030
1031 for i in 0..40u64 {
1033 journal.append(i * 100).await.unwrap();
1034 }
1035
1036 assert_eq!(journal.oldest_retained_pos(), Some(0));
1038 assert_eq!(journal.size(), 40);
1039
1040 let pruned = journal.prune(10).await.unwrap();
1042 assert!(pruned);
1043
1044 let oldest = journal.oldest_retained_pos().unwrap();
1046 assert_eq!(oldest, 10);
1047
1048 assert!(matches!(
1050 journal.read(0).await,
1051 Err(crate::journal::Error::ItemPruned(_))
1052 ));
1053 assert_eq!(journal.read(10).await.unwrap(), 1000);
1054 assert_eq!(journal.read(19).await.unwrap(), 1900);
1055
1056 let pruned = journal.prune(20).await.unwrap();
1058 assert!(pruned);
1059
1060 let oldest = journal.oldest_retained_pos().unwrap();
1062 assert_eq!(oldest, 20);
1063
1064 assert!(matches!(
1066 journal.read(10).await,
1067 Err(crate::journal::Error::ItemPruned(_))
1068 ));
1069 assert!(matches!(
1070 journal.read(19).await,
1071 Err(crate::journal::Error::ItemPruned(_))
1072 ));
1073 assert_eq!(journal.read(20).await.unwrap(), 2000);
1074 assert_eq!(journal.read(29).await.unwrap(), 2900);
1075
1076 let pruned = journal.prune(30).await.unwrap();
1078 assert!(pruned);
1079
1080 let oldest = journal.oldest_retained_pos().unwrap();
1082 assert_eq!(oldest, 30);
1083
1084 assert!(matches!(
1086 journal.read(20).await,
1087 Err(crate::journal::Error::ItemPruned(_))
1088 ));
1089 assert!(matches!(
1090 journal.read(29).await,
1091 Err(crate::journal::Error::ItemPruned(_))
1092 ));
1093 assert_eq!(journal.read(30).await.unwrap(), 3000);
1094 assert_eq!(journal.read(39).await.unwrap(), 3900);
1095
1096 assert_eq!(journal.size(), 40);
1098
1099 journal.destroy().await.unwrap();
1100 });
1101 }
1102
1103 #[test_traced]
1105 fn test_variable_prune_all_then_reinit() {
1106 let executor = deterministic::Runner::default();
1107 executor.start(|context| async move {
1108 let cfg = Config {
1109 partition: "prune_all_reinit".to_string(),
1110 items_per_section: NZU64!(10),
1111 compression: None,
1112 codec_config: (),
1113 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1114 write_buffer: NZUsize!(1024),
1115 };
1116
1117 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1119 .await
1120 .unwrap();
1121
1122 for i in 0..100u64 {
1123 journal.append(i * 100).await.unwrap();
1124 }
1125
1126 assert_eq!(journal.size(), 100);
1127 assert_eq!(journal.oldest_retained_pos(), Some(0));
1128
1129 let pruned = journal.prune(100).await.unwrap();
1131 assert!(pruned);
1132
1133 assert_eq!(journal.size(), 100);
1135 assert_eq!(journal.oldest_retained_pos(), None);
1136
1137 for i in 0..100 {
1139 assert!(matches!(
1140 journal.read(i).await,
1141 Err(crate::journal::Error::ItemPruned(_))
1142 ));
1143 }
1144
1145 journal.close().await.unwrap();
1146
1147 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1149 .await
1150 .unwrap();
1151
1152 assert_eq!(journal.size(), 100);
1154 assert_eq!(journal.oldest_retained_pos(), None);
1155
1156 for i in 0..100 {
1158 assert!(matches!(
1159 journal.read(i).await,
1160 Err(crate::journal::Error::ItemPruned(_))
1161 ));
1162 }
1163
1164 journal.append(10000).await.unwrap();
1167 assert_eq!(journal.size(), 101);
1168 assert_eq!(journal.oldest_retained_pos(), Some(100));
1170
1171 assert_eq!(journal.read(100).await.unwrap(), 10000);
1173
1174 assert!(matches!(
1176 journal.read(99).await,
1177 Err(crate::journal::Error::ItemPruned(_))
1178 ));
1179
1180 journal.destroy().await.unwrap();
1181 });
1182 }
1183
1184 #[test_traced]
1186 fn test_variable_recovery_prune_crash_offsets_behind() {
1187 let executor = deterministic::Runner::default();
1188 executor.start(|context| async move {
1189 let cfg = Config {
1191 partition: "recovery_prune_crash".to_string(),
1192 items_per_section: NZU64!(10),
1193 compression: None,
1194 codec_config: (),
1195 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1196 write_buffer: NZUsize!(1024),
1197 };
1198
1199 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1200 .await
1201 .unwrap();
1202
1203 for i in 0..40u64 {
1205 variable.append(i * 100).await.unwrap();
1206 }
1207
1208 variable.prune(10).await.unwrap();
1210 assert_eq!(variable.oldest_retained_pos(), Some(10));
1211
1212 variable.data.prune(2).await.unwrap();
1215 variable.close().await.unwrap();
1218
1219 let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1221 .await
1222 .unwrap();
1223
1224 assert_eq!(variable.oldest_retained_pos(), Some(20));
1226 assert_eq!(variable.size(), 40);
1227
1228 assert!(matches!(
1230 variable.read(10).await,
1231 Err(crate::journal::Error::ItemPruned(_))
1232 ));
1233
1234 assert_eq!(variable.read(20).await.unwrap(), 2000);
1236 assert_eq!(variable.read(39).await.unwrap(), 3900);
1237
1238 variable.destroy().await.unwrap();
1239 });
1240 }
1241
1242 #[test_traced]
1247 fn test_variable_recovery_offsets_ahead_corruption() {
1248 let executor = deterministic::Runner::default();
1249 executor.start(|context| async move {
1250 let cfg = Config {
1252 partition: "recovery_offsets_ahead".to_string(),
1253 items_per_section: NZU64!(10),
1254 compression: None,
1255 codec_config: (),
1256 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1257 write_buffer: NZUsize!(1024),
1258 };
1259
1260 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1261 .await
1262 .unwrap();
1263
1264 for i in 0..40u64 {
1266 variable.append(i * 100).await.unwrap();
1267 }
1268
1269 variable.offsets.prune(20).await.unwrap(); variable.data.prune(1).await.unwrap(); variable.close().await.unwrap();
1274
1275 let result = Journal::<_, u64>::init(context.clone(), cfg.clone()).await;
1277 assert!(matches!(result, Err(Error::Corruption(_))));
1278 });
1279 }
1280
1281 #[test_traced]
1283 fn test_variable_recovery_append_crash_offsets_behind() {
1284 let executor = deterministic::Runner::default();
1285 executor.start(|context| async move {
1286 let cfg = Config {
1288 partition: "recovery_append_crash".to_string(),
1289 items_per_section: NZU64!(10),
1290 compression: None,
1291 codec_config: (),
1292 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1293 write_buffer: NZUsize!(1024),
1294 };
1295
1296 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1297 .await
1298 .unwrap();
1299
1300 for i in 0..15u64 {
1302 variable.append(i * 100).await.unwrap();
1303 }
1304
1305 assert_eq!(variable.size(), 15);
1306
1307 for i in 15..20u64 {
1309 variable.data.append(1, i * 100).await.unwrap();
1310 }
1311 variable.close().await.unwrap();
1314
1315 let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1317 .await
1318 .unwrap();
1319
1320 assert_eq!(variable.size(), 20);
1322 assert_eq!(variable.oldest_retained_pos(), Some(0));
1323
1324 for i in 0..20u64 {
1326 assert_eq!(variable.read(i).await.unwrap(), i * 100);
1327 }
1328
1329 assert_eq!(variable.offsets.size(), 20);
1331
1332 variable.destroy().await.unwrap();
1333 });
1334 }
1335
1336 #[test_traced]
1338 fn test_variable_recovery_multiple_prunes_crash() {
1339 let executor = deterministic::Runner::default();
1340 executor.start(|context| async move {
1341 let cfg = Config {
1343 partition: "recovery_multiple_prunes".to_string(),
1344 items_per_section: NZU64!(10),
1345 compression: None,
1346 codec_config: (),
1347 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1348 write_buffer: NZUsize!(1024),
1349 };
1350
1351 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1352 .await
1353 .unwrap();
1354
1355 for i in 0..50u64 {
1357 variable.append(i * 100).await.unwrap();
1358 }
1359
1360 variable.prune(10).await.unwrap();
1362 assert_eq!(variable.oldest_retained_pos(), Some(10));
1363
1364 variable.data.prune(3).await.unwrap();
1367 variable.close().await.unwrap();
1370
1371 let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1373 .await
1374 .unwrap();
1375
1376 assert_eq!(variable.oldest_retained_pos(), Some(30));
1378 assert_eq!(variable.size(), 50);
1379
1380 assert!(matches!(
1382 variable.read(10).await,
1383 Err(crate::journal::Error::ItemPruned(_))
1384 ));
1385 assert!(matches!(
1386 variable.read(20).await,
1387 Err(crate::journal::Error::ItemPruned(_))
1388 ));
1389
1390 assert_eq!(variable.read(30).await.unwrap(), 3000);
1392 assert_eq!(variable.read(49).await.unwrap(), 4900);
1393
1394 variable.destroy().await.unwrap();
1395 });
1396 }
1397
1398 #[test_traced]
1405 fn test_variable_recovery_rewind_crash_multi_section() {
1406 let executor = deterministic::Runner::default();
1407 executor.start(|context| async move {
1408 let cfg = Config {
1410 partition: "recovery_rewind_crash".to_string(),
1411 items_per_section: NZU64!(10),
1412 compression: None,
1413 codec_config: (),
1414 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1415 write_buffer: NZUsize!(1024),
1416 };
1417
1418 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1419 .await
1420 .unwrap();
1421
1422 for i in 0..25u64 {
1424 variable.append(i * 100).await.unwrap();
1425 }
1426
1427 assert_eq!(variable.size(), 25);
1428
1429 variable.offsets.rewind(5).await.unwrap();
1432 variable.close().await.unwrap();
1435
1436 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1438 .await
1439 .unwrap();
1440
1441 assert_eq!(variable.size(), 25);
1443 assert_eq!(variable.oldest_retained_pos(), Some(0));
1444
1445 for i in 0..25u64 {
1447 assert_eq!(variable.read(i).await.unwrap(), i * 100);
1448 }
1449
1450 assert_eq!(variable.offsets.size(), 25);
1452
1453 let pos = variable.append(2500).await.unwrap();
1455 assert_eq!(pos, 25);
1456 assert_eq!(variable.read(25).await.unwrap(), 2500);
1457
1458 variable.destroy().await.unwrap();
1459 });
1460 }
1461
1462 #[test_traced]
1465 fn test_variable_recovery_empty_offsets_after_prune_and_append() {
1466 let executor = deterministic::Runner::default();
1467 executor.start(|context| async move {
1468 let cfg = Config {
1469 partition: "recovery_empty_after_prune".to_string(),
1470 items_per_section: NZU64!(10),
1471 compression: None,
1472 codec_config: (),
1473 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1474 write_buffer: NZUsize!(1024),
1475 };
1476
1477 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1479 .await
1480 .unwrap();
1481
1482 for i in 0..10u64 {
1484 journal.append(i * 100).await.unwrap();
1485 }
1486 assert_eq!(journal.size(), 10);
1487 assert_eq!(journal.oldest_retained_pos(), Some(0));
1488
1489 journal.prune(10).await.unwrap();
1491 assert_eq!(journal.size(), 10);
1492 assert_eq!(journal.oldest_retained_pos(), None); for i in 10..20u64 {
1498 journal.data.append(1, i * 100).await.unwrap();
1499 }
1500 journal.data.sync(1).await.unwrap();
1502 drop(journal);
1506
1507 let journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1509 .await
1510 .expect("Should recover from crash after data sync but before offsets sync");
1511
1512 assert_eq!(journal.size(), 20);
1514 assert_eq!(journal.oldest_retained_pos(), Some(10));
1515
1516 for i in 10..20u64 {
1518 assert_eq!(journal.read(i).await.unwrap(), i * 100);
1519 }
1520
1521 for i in 0..10 {
1523 assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1524 }
1525
1526 journal.destroy().await.unwrap();
1527 });
1528 }
1529
1530 #[test_traced]
1532 fn test_variable_concurrent_sync_recovery() {
1533 let executor = deterministic::Runner::default();
1534 executor.start(|context| async move {
1535 let cfg = Config {
1536 partition: "concurrent_sync_recovery".to_string(),
1537 items_per_section: NZU64!(10),
1538 compression: None,
1539 codec_config: (),
1540 buffer_pool: PoolRef::new(NZUsize!(1024), NZUsize!(10)),
1541 write_buffer: NZUsize!(1024),
1542 };
1543
1544 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1545 .await
1546 .unwrap();
1547
1548 for i in 0..15u64 {
1550 journal.append(i * 100).await.unwrap();
1551 }
1552
1553 journal.commit().await.unwrap();
1555
1556 drop(journal);
1558
1559 let journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1560 .await
1561 .unwrap();
1562
1563 assert_eq!(journal.size(), 15);
1565 for i in 0..15u64 {
1566 assert_eq!(journal.read(i).await.unwrap(), i * 100);
1567 }
1568
1569 journal.destroy().await.unwrap();
1570 });
1571 }
1572
1573 #[test_traced]
1574 fn test_init_at_size_zero() {
1575 let executor = deterministic::Runner::default();
1576 executor.start(|context| async move {
1577 let cfg = Config {
1578 partition: "init_at_size_zero".to_string(),
1579 items_per_section: NZU64!(5),
1580 compression: None,
1581 codec_config: (),
1582 buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1583 write_buffer: NZUsize!(1024),
1584 };
1585
1586 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 0)
1587 .await
1588 .unwrap();
1589
1590 assert_eq!(journal.size(), 0);
1592
1593 assert_eq!(journal.oldest_retained_pos(), None);
1595
1596 let pos = journal.append(100).await.unwrap();
1598 assert_eq!(pos, 0);
1599 assert_eq!(journal.size(), 1);
1600 assert_eq!(journal.read(0).await.unwrap(), 100);
1601
1602 journal.destroy().await.unwrap();
1603 });
1604 }
1605
1606 #[test_traced]
1607 fn test_init_at_size_section_boundary() {
1608 let executor = deterministic::Runner::default();
1609 executor.start(|context| async move {
1610 let cfg = Config {
1611 partition: "init_at_size_boundary".to_string(),
1612 items_per_section: NZU64!(5),
1613 compression: None,
1614 codec_config: (),
1615 buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1616 write_buffer: NZUsize!(1024),
1617 };
1618
1619 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 10)
1621 .await
1622 .unwrap();
1623
1624 assert_eq!(journal.size(), 10);
1626
1627 assert_eq!(journal.oldest_retained_pos(), None);
1629
1630 let pos = journal.append(1000).await.unwrap();
1632 assert_eq!(pos, 10);
1633 assert_eq!(journal.size(), 11);
1634 assert_eq!(journal.read(10).await.unwrap(), 1000);
1635
1636 let pos = journal.append(1001).await.unwrap();
1638 assert_eq!(pos, 11);
1639 assert_eq!(journal.read(11).await.unwrap(), 1001);
1640
1641 journal.destroy().await.unwrap();
1642 });
1643 }
1644
1645 #[test_traced]
1646 fn test_init_at_size_mid_section() {
1647 let executor = deterministic::Runner::default();
1648 executor.start(|context| async move {
1649 let cfg = Config {
1650 partition: "init_at_size_mid".to_string(),
1651 items_per_section: NZU64!(5),
1652 compression: None,
1653 codec_config: (),
1654 buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1655 write_buffer: NZUsize!(1024),
1656 };
1657
1658 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 7)
1660 .await
1661 .unwrap();
1662
1663 assert_eq!(journal.size(), 7);
1665
1666 assert_eq!(journal.oldest_retained_pos(), None);
1668
1669 let pos = journal.append(700).await.unwrap();
1671 assert_eq!(pos, 7);
1672 assert_eq!(journal.size(), 8);
1673 assert_eq!(journal.read(7).await.unwrap(), 700);
1674
1675 journal.destroy().await.unwrap();
1676 });
1677 }
1678
1679 #[test_traced]
1680 fn test_init_at_size_persistence() {
1681 let executor = deterministic::Runner::default();
1682 executor.start(|context| async move {
1683 let cfg = Config {
1684 partition: "init_at_size_persist".to_string(),
1685 items_per_section: NZU64!(5),
1686 compression: None,
1687 codec_config: (),
1688 buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1689 write_buffer: NZUsize!(1024),
1690 };
1691
1692 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 15)
1694 .await
1695 .unwrap();
1696
1697 for i in 0..5u64 {
1699 let pos = journal.append(1500 + i).await.unwrap();
1700 assert_eq!(pos, 15 + i);
1701 }
1702
1703 assert_eq!(journal.size(), 20);
1704
1705 journal.close().await.unwrap();
1707
1708 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1709 .await
1710 .unwrap();
1711
1712 assert_eq!(journal.size(), 20);
1714 assert_eq!(journal.oldest_retained_pos(), Some(15));
1715
1716 for i in 0..5u64 {
1718 assert_eq!(journal.read(15 + i).await.unwrap(), 1500 + i);
1719 }
1720
1721 let pos = journal.append(9999).await.unwrap();
1723 assert_eq!(pos, 20);
1724 assert_eq!(journal.read(20).await.unwrap(), 9999);
1725
1726 journal.destroy().await.unwrap();
1727 });
1728 }
1729
1730 #[test_traced]
1731 fn test_init_at_size_large_offset() {
1732 let executor = deterministic::Runner::default();
1733 executor.start(|context| async move {
1734 let cfg = Config {
1735 partition: "init_at_size_large".to_string(),
1736 items_per_section: NZU64!(5),
1737 compression: None,
1738 codec_config: (),
1739 buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1740 write_buffer: NZUsize!(1024),
1741 };
1742
1743 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 1000)
1745 .await
1746 .unwrap();
1747
1748 assert_eq!(journal.size(), 1000);
1749 assert_eq!(journal.oldest_retained_pos(), None);
1751
1752 let pos = journal.append(100000).await.unwrap();
1754 assert_eq!(pos, 1000);
1755 assert_eq!(journal.read(1000).await.unwrap(), 100000);
1756
1757 journal.destroy().await.unwrap();
1758 });
1759 }
1760
1761 #[test_traced]
1762 fn test_init_at_size_prune_and_append() {
1763 let executor = deterministic::Runner::default();
1764 executor.start(|context| async move {
1765 let cfg = Config {
1766 partition: "init_at_size_prune".to_string(),
1767 items_per_section: NZU64!(5),
1768 compression: None,
1769 codec_config: (),
1770 buffer_pool: PoolRef::new(NZUsize!(512), NZUsize!(2)),
1771 write_buffer: NZUsize!(1024),
1772 };
1773
1774 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 20)
1776 .await
1777 .unwrap();
1778
1779 for i in 0..10u64 {
1781 journal.append(2000 + i).await.unwrap();
1782 }
1783
1784 assert_eq!(journal.size(), 30);
1785
1786 journal.prune(25).await.unwrap();
1788
1789 assert_eq!(journal.size(), 30);
1790 assert_eq!(journal.oldest_retained_pos(), Some(25));
1791
1792 for i in 25..30u64 {
1794 assert_eq!(journal.read(i).await.unwrap(), 2000 + (i - 20));
1795 }
1796
1797 let pos = journal.append(3000).await.unwrap();
1799 assert_eq!(pos, 30);
1800
1801 journal.destroy().await.unwrap();
1802 });
1803 }
1804
1805 #[test_traced]
1807 fn test_init_sync_no_existing_data() {
1808 let executor = deterministic::Runner::default();
1809 executor.start(|context| async move {
1810 let cfg = Config {
1811 partition: "test_fresh_start".into(),
1812 items_per_section: NZU64!(5),
1813 compression: None,
1814 codec_config: (),
1815 write_buffer: NZUsize!(1024),
1816 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1817 };
1818
1819 let lower_bound = 10;
1821 let upper_bound = 26;
1822 let mut journal =
1823 Journal::init_sync(context.clone(), cfg.clone(), lower_bound..upper_bound)
1824 .await
1825 .expect("Failed to initialize journal with sync boundaries");
1826
1827 assert_eq!(journal.size(), lower_bound);
1828 assert_eq!(journal.oldest_retained_pos(), None);
1829
1830 let pos1 = journal.append(42u64).await.unwrap();
1832 assert_eq!(pos1, lower_bound);
1833 assert_eq!(journal.read(pos1).await.unwrap(), 42u64);
1834
1835 let pos2 = journal.append(43u64).await.unwrap();
1836 assert_eq!(pos2, lower_bound + 1);
1837 assert_eq!(journal.read(pos2).await.unwrap(), 43u64);
1838
1839 journal.destroy().await.unwrap();
1840 });
1841 }
1842
1843 #[test_traced]
1845 fn test_init_sync_existing_data_overlap() {
1846 let executor = deterministic::Runner::default();
1847 executor.start(|context| async move {
1848 let cfg = Config {
1849 partition: "test_overlap".into(),
1850 items_per_section: NZU64!(5),
1851 compression: None,
1852 codec_config: (),
1853 write_buffer: NZUsize!(1024),
1854 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1855 };
1856
1857 let mut journal =
1859 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
1860 .await
1861 .expect("Failed to create initial journal");
1862
1863 for i in 0..20u64 {
1865 journal.append(i * 100).await.unwrap();
1866 }
1867 journal.close().await.unwrap();
1868
1869 let lower_bound = 8;
1872 let upper_bound = 31;
1873 let mut journal = Journal::<deterministic::Context, u64>::init_sync(
1874 context.clone(),
1875 cfg.clone(),
1876 lower_bound..upper_bound,
1877 )
1878 .await
1879 .expect("Failed to initialize journal with overlap");
1880
1881 assert_eq!(journal.size(), 20);
1882
1883 let oldest = journal.oldest_retained_pos();
1885 assert_eq!(oldest, Some(5)); assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
1889 assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
1890
1891 assert_eq!(journal.read(5).await.unwrap(), 500);
1893 assert_eq!(journal.read(8).await.unwrap(), 800);
1894 assert_eq!(journal.read(19).await.unwrap(), 1900);
1895
1896 assert!(matches!(
1898 journal.read(20).await,
1899 Err(Error::ItemOutOfRange(_))
1900 ));
1901
1902 let pos = journal.append(999).await.unwrap();
1904 assert_eq!(pos, 20);
1905 assert_eq!(journal.read(20).await.unwrap(), 999);
1906
1907 journal.destroy().await.unwrap();
1908 });
1909 }
1910
1911 #[should_panic]
1913 #[test_traced]
1914 fn test_init_sync_invalid_parameters() {
1915 let executor = deterministic::Runner::default();
1916 executor.start(|context| async move {
1917 let cfg = Config {
1918 partition: "test_invalid".into(),
1919 items_per_section: NZU64!(5),
1920 compression: None,
1921 codec_config: (),
1922 write_buffer: NZUsize!(1024),
1923 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1924 };
1925
1926 #[allow(clippy::reversed_empty_ranges)]
1927 let _result = Journal::<deterministic::Context, u64>::init_sync(
1928 context.clone(),
1929 cfg,
1930 10..5, )
1932 .await;
1933 });
1934 }
1935
1936 #[test_traced]
1938 fn test_init_sync_existing_data_exact_match() {
1939 let executor = deterministic::Runner::default();
1940 executor.start(|context| async move {
1941 let items_per_section = NZU64!(5);
1942 let cfg = Config {
1943 partition: "test_exact_match".to_string(),
1944 items_per_section,
1945 compression: None,
1946 codec_config: (),
1947 write_buffer: NZUsize!(1024),
1948 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1949 };
1950
1951 let mut journal =
1953 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
1954 .await
1955 .expect("Failed to create initial journal");
1956
1957 for i in 0..20u64 {
1959 journal.append(i * 100).await.unwrap();
1960 }
1961 journal.close().await.unwrap();
1962
1963 let lower_bound = 5; let upper_bound = 20; let mut journal = Journal::<deterministic::Context, u64>::init_sync(
1967 context.clone(),
1968 cfg.clone(),
1969 lower_bound..upper_bound,
1970 )
1971 .await
1972 .expect("Failed to initialize journal with exact match");
1973
1974 assert_eq!(journal.size(), 20);
1975
1976 let oldest = journal.oldest_retained_pos();
1978 assert_eq!(oldest, Some(5)); assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
1982 assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
1983
1984 assert_eq!(journal.read(5).await.unwrap(), 500);
1986 assert_eq!(journal.read(10).await.unwrap(), 1000);
1987 assert_eq!(journal.read(19).await.unwrap(), 1900);
1988
1989 assert!(matches!(
1991 journal.read(20).await,
1992 Err(Error::ItemOutOfRange(_))
1993 ));
1994
1995 let pos = journal.append(999).await.unwrap();
1997 assert_eq!(pos, 20);
1998 assert_eq!(journal.read(20).await.unwrap(), 999);
1999
2000 journal.destroy().await.unwrap();
2001 });
2002 }
2003
2004 #[test_traced]
2007 fn test_init_sync_existing_data_exceeds_upper_bound() {
2008 let executor = deterministic::Runner::default();
2009 executor.start(|context| async move {
2010 let items_per_section = NZU64!(5);
2011 let cfg = Config {
2012 partition: "test_unexpected_data".into(),
2013 items_per_section,
2014 compression: None,
2015 codec_config: (),
2016 write_buffer: NZUsize!(1024),
2017 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
2018 };
2019
2020 let mut journal =
2022 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2023 .await
2024 .expect("Failed to create initial journal");
2025
2026 for i in 0..30u64 {
2028 journal.append(i * 1000).await.unwrap();
2029 }
2030 journal.close().await.unwrap();
2031
2032 let lower_bound = 8; for upper_bound in 9..29 {
2035 let result = Journal::<deterministic::Context, u64>::init_sync(
2036 context.clone(),
2037 cfg.clone(),
2038 lower_bound..upper_bound,
2039 )
2040 .await;
2041
2042 assert!(matches!(result, Err(crate::qmdb::Error::UnexpectedData(_))));
2044 }
2045 });
2046 }
2047
2048 #[test_traced]
2050 fn test_init_sync_existing_data_stale() {
2051 let executor = deterministic::Runner::default();
2052 executor.start(|context| async move {
2053 let items_per_section = NZU64!(5);
2054 let cfg = Config {
2055 partition: "test_stale".into(),
2056 items_per_section,
2057 compression: None,
2058 codec_config: (),
2059 write_buffer: NZUsize!(1024),
2060 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
2061 };
2062
2063 let mut journal =
2065 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2066 .await
2067 .expect("Failed to create initial journal");
2068
2069 for i in 0..10u64 {
2071 journal.append(i * 100).await.unwrap();
2072 }
2073 journal.close().await.unwrap();
2074
2075 let lower_bound = 15; let upper_bound = 26; let journal = Journal::<deterministic::Context, u64>::init_sync(
2079 context.clone(),
2080 cfg.clone(),
2081 lower_bound..upper_bound,
2082 )
2083 .await
2084 .expect("Failed to initialize journal with stale data");
2085
2086 assert_eq!(journal.size(), 15);
2087
2088 assert_eq!(journal.oldest_retained_pos(), None);
2090
2091 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2093 assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2094 assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2095
2096 journal.destroy().await.unwrap();
2097 });
2098 }
2099
2100 #[test_traced]
2102 fn test_init_sync_section_boundaries() {
2103 let executor = deterministic::Runner::default();
2104 executor.start(|context| async move {
2105 let items_per_section = NZU64!(5);
2106 let cfg = Config {
2107 partition: "test_boundaries".into(),
2108 items_per_section,
2109 compression: None,
2110 codec_config: (),
2111 write_buffer: NZUsize!(1024),
2112 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
2113 };
2114
2115 let mut journal =
2117 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2118 .await
2119 .expect("Failed to create initial journal");
2120
2121 for i in 0..25u64 {
2123 journal.append(i * 100).await.unwrap();
2124 }
2125 journal.close().await.unwrap();
2126
2127 let lower_bound = 15; let upper_bound = 25; let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2131 context.clone(),
2132 cfg.clone(),
2133 lower_bound..upper_bound,
2134 )
2135 .await
2136 .expect("Failed to initialize journal at boundaries");
2137
2138 assert_eq!(journal.size(), 25);
2139
2140 let oldest = journal.oldest_retained_pos();
2142 assert_eq!(oldest, Some(15));
2143
2144 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2146 assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2147
2148 assert_eq!(journal.read(15).await.unwrap(), 1500);
2150 assert_eq!(journal.read(20).await.unwrap(), 2000);
2151 assert_eq!(journal.read(24).await.unwrap(), 2400);
2152
2153 assert!(matches!(
2155 journal.read(25).await,
2156 Err(Error::ItemOutOfRange(_))
2157 ));
2158
2159 let pos = journal.append(999).await.unwrap();
2161 assert_eq!(pos, 25);
2162 assert_eq!(journal.read(25).await.unwrap(), 999);
2163
2164 journal.destroy().await.unwrap();
2165 });
2166 }
2167
2168 #[test_traced]
2170 fn test_init_sync_same_section_bounds() {
2171 let executor = deterministic::Runner::default();
2172 executor.start(|context| async move {
2173 let items_per_section = NZU64!(5);
2174 let cfg = Config {
2175 partition: "test_same_section".into(),
2176 items_per_section,
2177 compression: None,
2178 codec_config: (),
2179 write_buffer: NZUsize!(1024),
2180 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
2181 };
2182
2183 let mut journal =
2185 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2186 .await
2187 .expect("Failed to create initial journal");
2188
2189 for i in 0..15u64 {
2191 journal.append(i * 100).await.unwrap();
2192 }
2193 journal.close().await.unwrap();
2194
2195 let lower_bound = 10; let upper_bound = 15; let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2199 context.clone(),
2200 cfg.clone(),
2201 lower_bound..upper_bound,
2202 )
2203 .await
2204 .expect("Failed to initialize journal with same-section bounds");
2205
2206 assert_eq!(journal.size(), 15);
2207
2208 let oldest = journal.oldest_retained_pos();
2211 assert_eq!(oldest, Some(10));
2212
2213 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2215 assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2216
2217 assert_eq!(journal.read(10).await.unwrap(), 1000);
2219 assert_eq!(journal.read(11).await.unwrap(), 1100);
2220 assert_eq!(journal.read(14).await.unwrap(), 1400);
2221
2222 assert!(matches!(
2224 journal.read(15).await,
2225 Err(Error::ItemOutOfRange(_))
2226 ));
2227
2228 let pos = journal.append(999).await.unwrap();
2230 assert_eq!(pos, 15);
2231 assert_eq!(journal.read(15).await.unwrap(), 999);
2232
2233 journal.destroy().await.unwrap();
2234 });
2235 }
2236}