1use crate::{
7 journal::{
8 contiguous::{fixed, Contiguous, MutableContiguous},
9 segmented::variable,
10 Error,
11 },
12 mmr::Location,
13 Persistable,
14};
15use commonware_codec::{Codec, CodecShared};
16use commonware_runtime::{buffer::PoolRef, Metrics, Storage};
17use commonware_utils::NZUsize;
18use core::ops::Range;
19use futures::{future::Either, stream, Stream, StreamExt as _};
20use std::num::{NonZeroU64, NonZeroUsize};
21use tracing::{debug, info};
22
23const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1024);
24
25const DATA_SUFFIX: &str = "_data";
27
28const OFFSETS_SUFFIX: &str = "_offsets";
30
31const fn position_to_section(position: u64, items_per_section: u64) -> u64 {
53 position / items_per_section
54}
55
56#[derive(Clone)]
58pub struct Config<C> {
59 pub partition: String,
61
62 pub items_per_section: NonZeroU64,
67
68 pub compression: Option<u8>,
70
71 pub codec_config: C,
73
74 pub buffer_pool: PoolRef,
76
77 pub write_buffer: NonZeroUsize,
79}
80
81impl<C> Config<C> {
82 fn data_partition(&self) -> String {
84 format!("{}{}", self.partition, DATA_SUFFIX)
85 }
86
87 fn offsets_partition(&self) -> String {
89 format!("{}{}", self.partition, OFFSETS_SUFFIX)
90 }
91}
92
93pub struct Journal<E: Storage + Metrics, V: Codec> {
131 data: variable::Journal<E, V>,
133
134 offsets: fixed::Journal<E, u64>,
137
138 items_per_section: u64,
145
146 size: u64,
152
153 oldest_retained_pos: u64,
160}
161
162impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
163 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
170 let items_per_section = cfg.items_per_section.get();
171 let data_partition = cfg.data_partition();
172 let offsets_partition = cfg.offsets_partition();
173
174 let mut data = variable::Journal::init(
176 context.clone(),
177 variable::Config {
178 partition: data_partition,
179 compression: cfg.compression,
180 codec_config: cfg.codec_config,
181 buffer_pool: cfg.buffer_pool.clone(),
182 write_buffer: cfg.write_buffer,
183 },
184 )
185 .await?;
186
187 let mut offsets = fixed::Journal::init(
189 context,
190 fixed::Config {
191 partition: offsets_partition,
192 items_per_blob: cfg.items_per_section,
193 buffer_pool: cfg.buffer_pool,
194 write_buffer: cfg.write_buffer,
195 },
196 )
197 .await?;
198
199 let (oldest_retained_pos, size) =
201 Self::align_journals(&mut data, &mut offsets, items_per_section).await?;
202 assert!(
203 oldest_retained_pos.is_multiple_of(items_per_section),
204 "oldest_retained_pos is not section-aligned"
205 );
206
207 Ok(Self {
208 data,
209 offsets,
210 items_per_section,
211 size,
212 oldest_retained_pos,
213 })
214 }
215
216 pub async fn init_at_size(context: E, cfg: Config<V::Cfg>, size: u64) -> Result<Self, Error> {
233 let data = variable::Journal::init(
235 context.clone(),
236 variable::Config {
237 partition: cfg.data_partition(),
238 compression: cfg.compression,
239 codec_config: cfg.codec_config.clone(),
240 buffer_pool: cfg.buffer_pool.clone(),
241 write_buffer: cfg.write_buffer,
242 },
243 )
244 .await?;
245
246 let offsets = fixed::Journal::init_at_size(
248 context,
249 fixed::Config {
250 partition: cfg.offsets_partition(),
251 items_per_blob: cfg.items_per_section,
252 buffer_pool: cfg.buffer_pool,
253 write_buffer: cfg.write_buffer,
254 },
255 size,
256 )
257 .await?;
258
259 Ok(Self {
260 data,
261 offsets,
262 items_per_section: cfg.items_per_section.get(),
263 size,
264 oldest_retained_pos: size,
265 })
266 }
267
268 pub(crate) async fn init_sync(
293 context: E,
294 cfg: Config<V::Cfg>,
295 range: Range<u64>,
296 ) -> Result<Self, crate::qmdb::Error> {
297 assert!(!range.is_empty(), "range must not be empty");
298
299 debug!(
300 range.start,
301 range.end,
302 items_per_section = cfg.items_per_section.get(),
303 "initializing contiguous variable journal for sync"
304 );
305
306 let mut journal = Self::init(context.with_label("journal"), cfg.clone()).await?;
308
309 let size = journal.size();
310
311 if size == 0 {
313 if range.start == 0 {
314 debug!("no existing journal data, returning empty journal");
315 return Ok(journal);
316 } else {
317 debug!(
318 range.start,
319 "no existing journal data, initializing at sync range start"
320 );
321 journal.destroy().await?;
322 return Ok(Self::init_at_size(context, cfg, range.start).await?);
323 }
324 }
325
326 if size > range.end {
328 return Err(crate::qmdb::Error::UnexpectedData(Location::new_unchecked(
329 size,
330 )));
331 }
332
333 if size <= range.start {
335 debug!(
337 size,
338 range.start, "existing journal data is stale, re-initializing at start position"
339 );
340 journal.destroy().await?;
341 return Ok(Self::init_at_size(context, cfg, range.start).await?);
342 }
343
344 let oldest = journal.oldest_retained_pos();
346 if let Some(oldest_pos) = oldest {
347 if oldest_pos < range.start {
348 debug!(
349 oldest_pos,
350 range.start, "pruning journal to sync range start"
351 );
352 journal.prune(range.start).await?;
353 }
354 }
355
356 Ok(journal)
357 }
358
359 pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
372 match size.cmp(&self.size) {
374 std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
375 std::cmp::Ordering::Equal => return Ok(()), std::cmp::Ordering::Less => {}
377 }
378
379 if size < self.oldest_retained_pos {
381 return Err(Error::ItemPruned(size));
382 }
383
384 let discard_offset = self.offsets.read(size).await?;
386 let discard_section = position_to_section(size, self.items_per_section);
387
388 self.data
389 .rewind_to_offset(discard_section, discard_offset)
390 .await?;
391 self.offsets.rewind(size).await?;
392
393 self.size = size;
395
396 Ok(())
397 }
398
399 pub async fn append(&mut self, item: V) -> Result<u64, Error> {
415 let section = self.current_section();
417
418 let (offset, _size) = self.data.append(section, item).await?;
420
421 let offsets_pos = self.offsets.append(offset).await?;
423 assert_eq!(offsets_pos, self.size);
424
425 let position = self.size;
427 self.size += 1;
428
429 if self.size.is_multiple_of(self.items_per_section) {
431 futures::try_join!(self.data.sync(section), self.offsets.sync())?;
432 }
433
434 Ok(position)
435 }
436
437 pub const fn size(&self) -> u64 {
442 self.size
443 }
444
445 pub const fn oldest_retained_pos(&self) -> Option<u64> {
449 if self.size == self.oldest_retained_pos {
450 None
452 } else {
453 Some(self.oldest_retained_pos)
454 }
455 }
456
457 pub fn pruning_boundary(&self) -> u64 {
459 self.oldest_retained_pos().unwrap_or(self.size)
460 }
461
462 pub async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
473 if min_position <= self.oldest_retained_pos {
474 return Ok(false);
475 }
476
477 let min_position = min_position.min(self.size);
479
480 let min_section = position_to_section(min_position, self.items_per_section);
482
483 let pruned = self.data.prune(min_section).await?;
484 if pruned {
485 self.oldest_retained_pos = min_section * self.items_per_section;
486 self.offsets.prune(self.oldest_retained_pos).await?;
487 }
488 Ok(pruned)
489 }
490
491 pub async fn replay(
501 &self,
502 start_pos: u64,
503 buffer_size: NonZeroUsize,
504 ) -> Result<impl Stream<Item = Result<(u64, V), Error>> + '_, Error> {
505 if start_pos < self.oldest_retained_pos {
507 return Err(Error::ItemPruned(start_pos));
508 }
509 if start_pos > self.size {
510 return Err(Error::ItemOutOfRange(start_pos));
511 }
512
513 if start_pos == self.size {
515 return Ok(Either::Left(stream::empty()));
516 }
517
518 let start_offset = self.offsets.read(start_pos).await?;
520 let start_section = position_to_section(start_pos, self.items_per_section);
521 let data_stream = self
522 .data
523 .replay(start_section, start_offset, buffer_size)
524 .await?;
525
526 let transformed = data_stream.enumerate().map(move |(idx, result)| {
528 result.map(|(_section, _offset, _size, item)| {
529 let pos = start_pos + idx as u64;
531 (pos, item)
532 })
533 });
534
535 Ok(Either::Right(transformed))
536 }
537
538 pub async fn read(&self, position: u64) -> Result<V, Error> {
546 if position >= self.size {
548 return Err(Error::ItemOutOfRange(position));
549 }
550
551 if position < self.oldest_retained_pos {
552 return Err(Error::ItemPruned(position));
553 }
554
555 let offset = self.offsets.read(position).await?;
557 let section = position_to_section(position, self.items_per_section);
558
559 self.data.get(section, offset).await
561 }
562
563 pub async fn commit(&mut self) -> Result<(), Error> {
568 let section = self.current_section();
569 self.data.sync(section).await
570 }
571
572 pub async fn sync(&mut self) -> Result<(), Error> {
576 let section = self.current_section();
579
580 futures::try_join!(self.data.sync(section), self.offsets.sync())?;
582
583 Ok(())
584 }
585
586 pub async fn destroy(self) -> Result<(), Error> {
590 self.data.destroy().await?;
591 self.offsets.destroy().await
592 }
593
594 const fn current_section(&self) -> u64 {
596 position_to_section(self.size, self.items_per_section)
597 }
598
599 async fn align_journals(
609 data: &mut variable::Journal<E, V>,
610 offsets: &mut fixed::Journal<E, u64>,
611 items_per_section: u64,
612 ) -> Result<(u64, u64), Error> {
613 let items_in_last_section = match data.newest_section() {
615 Some(last_section) => {
616 let stream = data.replay(last_section, 0, REPLAY_BUFFER_SIZE).await?;
617 futures::pin_mut!(stream);
618 let mut count = 0u64;
619 while let Some(result) = stream.next().await {
620 result?; count += 1;
622 }
623 count
624 }
625 None => 0,
626 };
627
628 let data_empty =
632 data.is_empty() || (data.num_sections() == 1 && items_in_last_section == 0);
633 if data_empty {
634 let size = offsets.size();
635
636 if !data.is_empty() {
637 let first_section = data.oldest_section().unwrap();
643 let target_pos = first_section * items_per_section;
644
645 info!("crash repair: rewinding offsets from {size} to {target_pos}");
646 offsets.rewind(target_pos).await?;
647 offsets.sync().await?;
648 return Ok((target_pos, target_pos));
649 }
650
651 if let Some(oldest) = offsets.oldest_retained_pos() {
656 if oldest < size {
657 info!("crash repair: pruning offsets to {size} (prune-all crash)");
659 offsets.prune(size).await?;
660 offsets.sync().await?;
661 }
662 }
663
664 return Ok((size, size));
665 }
666
667 let (data_oldest_pos, data_size) = {
669 let first_section = data.oldest_section().unwrap();
671 let last_section = data.newest_section().unwrap();
672
673 let oldest_pos = first_section * items_per_section;
674
675 let size = (last_section * items_per_section) + items_in_last_section;
679 (oldest_pos, size)
680 };
681 assert_ne!(
682 data_oldest_pos, data_size,
683 "data journal expected to be non-empty"
684 );
685
686 match offsets.oldest_retained_pos() {
689 Some(oldest_retained_pos) if oldest_retained_pos < data_oldest_pos => {
690 info!("crash repair: pruning offsets journal to {data_oldest_pos}");
692 offsets.prune(data_oldest_pos).await?;
693 }
694 Some(oldest_retained_pos) if oldest_retained_pos > data_oldest_pos => {
695 return Err(Error::Corruption(format!(
696 "offsets oldest pos ({oldest_retained_pos}) > data oldest pos ({data_oldest_pos})"
697 )));
698 }
699 Some(_) => {
700 }
702 None if data_oldest_pos > 0 => {
703 let offsets_size = offsets.size();
708 if offsets_size != data_oldest_pos {
709 return Err(Error::Corruption(format!(
710 "offsets journal empty: size ({offsets_size}) != data oldest pos ({data_oldest_pos})"
711 )));
712 }
713 info!("crash repair: offsets journal empty at {data_oldest_pos}");
714 }
715 None => {
716 }
718 }
719
720 let offsets_size = offsets.size();
721 if offsets_size > data_size {
722 info!("crash repair: rewinding offsets from {offsets_size} to {data_size}");
724 offsets.rewind(data_size).await?;
725 } else if offsets_size < data_size {
726 Self::add_missing_offsets(data, offsets, offsets_size, items_per_section).await?;
729 }
730
731 assert_eq!(offsets.size(), data_size);
732 assert_eq!(offsets.oldest_retained_pos(), Some(data_oldest_pos));
734
735 offsets.sync().await?;
736 Ok((data_oldest_pos, data_size))
737 }
738
739 async fn add_missing_offsets(
750 data: &variable::Journal<E, V>,
751 offsets: &mut fixed::Journal<E, u64>,
752 offsets_size: u64,
753 items_per_section: u64,
754 ) -> Result<(), Error> {
755 assert!(
756 !data.is_empty(),
757 "rebuild_offsets called with empty data journal"
758 );
759
760 let (start_section, resume_offset, skip_first) =
762 if let Some(oldest) = offsets.oldest_retained_pos() {
763 if oldest < offsets_size {
764 let last_offset = offsets.read(offsets_size - 1).await?;
766 let last_section = position_to_section(offsets_size - 1, items_per_section);
767 (last_section, last_offset, true)
768 } else {
769 let first_section = data.oldest_section().unwrap();
772 (first_section, 0, false)
773 }
774 } else {
775 let first_section = data.oldest_section().unwrap();
778 (first_section, 0, false)
779 };
780
781 let stream = data
785 .replay(start_section, resume_offset, REPLAY_BUFFER_SIZE)
786 .await?;
787 futures::pin_mut!(stream);
788
789 let mut skipped_first = false;
790 while let Some(result) = stream.next().await {
791 let (_section, offset, _size, _item) = result?;
792
793 if skip_first && !skipped_first {
795 skipped_first = true;
796 continue;
797 }
798
799 offsets.append(offset).await?;
800 }
801
802 Ok(())
803 }
804}
805
806impl<E: Storage + Metrics, V: CodecShared> Contiguous for Journal<E, V> {
808 type Item = V;
809
810 fn size(&self) -> u64 {
811 Self::size(self)
812 }
813
814 fn oldest_retained_pos(&self) -> Option<u64> {
815 Self::oldest_retained_pos(self)
816 }
817
818 fn pruning_boundary(&self) -> u64 {
819 Self::pruning_boundary(self)
820 }
821
822 async fn replay(
823 &self,
824 start_pos: u64,
825 buffer: NonZeroUsize,
826 ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error> {
827 Self::replay(self, start_pos, buffer).await
828 }
829
830 async fn read(&self, position: u64) -> Result<Self::Item, Error> {
831 Self::read(self, position).await
832 }
833}
834
835impl<E: Storage + Metrics, V: CodecShared> MutableContiguous for Journal<E, V> {
836 async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {
837 Self::append(self, item).await
838 }
839
840 async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
841 Self::prune(self, min_position).await
842 }
843
844 async fn rewind(&mut self, size: u64) -> Result<(), Error> {
845 Self::rewind(self, size).await
846 }
847}
848
849impl<E: Storage + Metrics, V: CodecShared> Persistable for Journal<E, V> {
850 type Error = Error;
851
852 async fn commit(&mut self) -> Result<(), Error> {
853 Self::commit(self).await
854 }
855
856 async fn sync(&mut self) -> Result<(), Error> {
857 Self::sync(self).await
858 }
859
860 async fn destroy(self) -> Result<(), Error> {
861 Self::destroy(self).await
862 }
863}
864#[cfg(test)]
865mod tests {
866 use super::*;
867 use crate::journal::contiguous::tests::run_contiguous_tests;
868 use commonware_macros::test_traced;
869 use commonware_runtime::{buffer::pool::PoolRef, deterministic, Runner};
870 use commonware_utils::{NZUsize, NZU16, NZU64};
871 use futures::FutureExt as _;
872 use std::num::NonZeroU16;
873
874 const PAGE_SIZE: NonZeroU16 = NZU16!(101);
876 const PAGE_CACHE_SIZE: usize = 2;
877 const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(1024);
879 const SMALL_PAGE_SIZE: NonZeroU16 = NZU16!(512);
880
881 #[test_traced]
887 fn test_variable_offsets_partition_loss_after_prune_unrecoverable() {
888 let executor = deterministic::Runner::default();
889 executor.start(|context| async move {
890 let cfg = Config {
891 partition: "offsets_loss_after_prune".to_string(),
892 items_per_section: NZU64!(10),
893 compression: None,
894 codec_config: (),
895 buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
896 write_buffer: NZUsize!(1024),
897 };
898
899 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
901 .await
902 .unwrap();
903
904 for i in 0..40u64 {
906 journal.append(i * 100).await.unwrap();
907 }
908
909 journal.prune(20).await.unwrap();
911 assert_eq!(journal.oldest_retained_pos(), Some(20));
912 assert_eq!(journal.size(), 40);
913
914 journal.sync().await.unwrap();
915 drop(journal);
916
917 context
919 .remove(&cfg.offsets_partition(), None)
920 .await
921 .expect("Failed to remove offsets partition");
922
923 let result = Journal::<_, u64>::init(context.clone(), cfg.clone()).await;
925 assert!(matches!(result, Err(Error::Corruption(_))));
926 });
927 }
928
929 #[test_traced]
937 fn test_variable_align_data_offsets_mismatch() {
938 let executor = deterministic::Runner::default();
939 executor.start(|context| async move {
940 let cfg = Config {
941 partition: "data_loss_test".to_string(),
942 items_per_section: NZU64!(10),
943 compression: None,
944 codec_config: (),
945 buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
946 write_buffer: NZUsize!(1024),
947 };
948
949 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
951 .await
952 .unwrap();
953
954 for i in 0..20u64 {
956 variable.append(i * 100).await.unwrap();
957 }
958
959 variable.sync().await.unwrap();
960 drop(variable);
961
962 context
964 .remove(&cfg.data_partition(), None)
965 .await
966 .expect("Failed to remove data partition");
967
968 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
970 .await
971 .expect("Should align offsets to match empty data");
972
973 assert_eq!(journal.size(), 20);
975
976 assert_eq!(journal.oldest_retained_pos(), None);
978
979 for i in 0..20 {
981 assert!(matches!(
982 journal.read(i).await,
983 Err(crate::journal::Error::ItemPruned(_))
984 ));
985 }
986
987 let pos = journal.append(999).await.unwrap();
989 assert_eq!(pos, 20);
990 assert_eq!(journal.read(20).await.unwrap(), 999);
991
992 journal.destroy().await.unwrap();
993 });
994 }
995
996 #[test_traced]
997 fn test_variable_contiguous() {
998 let executor = deterministic::Runner::default();
999 executor.start(|context| async move {
1000 run_contiguous_tests(move |test_name: String| {
1001 let context = context.clone();
1002 async move {
1003 Journal::<_, u64>::init(
1004 context,
1005 Config {
1006 partition: format!("generic_test_{test_name}"),
1007 items_per_section: NZU64!(10),
1008 compression: None,
1009 codec_config: (),
1010 buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1011 write_buffer: NZUsize!(1024),
1012 },
1013 )
1014 .await
1015 }
1016 .boxed()
1017 })
1018 .await;
1019 });
1020 }
1021
1022 #[test_traced]
1024 fn test_variable_multiple_sequential_prunes() {
1025 let executor = deterministic::Runner::default();
1026 executor.start(|context| async move {
1027 let cfg = Config {
1028 partition: "sequential_prunes".to_string(),
1029 items_per_section: NZU64!(10),
1030 compression: None,
1031 codec_config: (),
1032 buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1033 write_buffer: NZUsize!(1024),
1034 };
1035
1036 let mut journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1037
1038 for i in 0..40u64 {
1040 journal.append(i * 100).await.unwrap();
1041 }
1042
1043 assert_eq!(journal.oldest_retained_pos(), Some(0));
1045 assert_eq!(journal.size(), 40);
1046
1047 let pruned = journal.prune(10).await.unwrap();
1049 assert!(pruned);
1050
1051 let oldest = journal.oldest_retained_pos().unwrap();
1053 assert_eq!(oldest, 10);
1054
1055 assert!(matches!(
1057 journal.read(0).await,
1058 Err(crate::journal::Error::ItemPruned(_))
1059 ));
1060 assert_eq!(journal.read(10).await.unwrap(), 1000);
1061 assert_eq!(journal.read(19).await.unwrap(), 1900);
1062
1063 let pruned = journal.prune(20).await.unwrap();
1065 assert!(pruned);
1066
1067 let oldest = journal.oldest_retained_pos().unwrap();
1069 assert_eq!(oldest, 20);
1070
1071 assert!(matches!(
1073 journal.read(10).await,
1074 Err(crate::journal::Error::ItemPruned(_))
1075 ));
1076 assert!(matches!(
1077 journal.read(19).await,
1078 Err(crate::journal::Error::ItemPruned(_))
1079 ));
1080 assert_eq!(journal.read(20).await.unwrap(), 2000);
1081 assert_eq!(journal.read(29).await.unwrap(), 2900);
1082
1083 let pruned = journal.prune(30).await.unwrap();
1085 assert!(pruned);
1086
1087 let oldest = journal.oldest_retained_pos().unwrap();
1089 assert_eq!(oldest, 30);
1090
1091 assert!(matches!(
1093 journal.read(20).await,
1094 Err(crate::journal::Error::ItemPruned(_))
1095 ));
1096 assert!(matches!(
1097 journal.read(29).await,
1098 Err(crate::journal::Error::ItemPruned(_))
1099 ));
1100 assert_eq!(journal.read(30).await.unwrap(), 3000);
1101 assert_eq!(journal.read(39).await.unwrap(), 3900);
1102
1103 assert_eq!(journal.size(), 40);
1105
1106 journal.destroy().await.unwrap();
1107 });
1108 }
1109
1110 #[test_traced]
1112 fn test_variable_prune_all_then_reinit() {
1113 let executor = deterministic::Runner::default();
1114 executor.start(|context| async move {
1115 let cfg = Config {
1116 partition: "prune_all_reinit".to_string(),
1117 items_per_section: NZU64!(10),
1118 compression: None,
1119 codec_config: (),
1120 buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1121 write_buffer: NZUsize!(1024),
1122 };
1123
1124 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1126 .await
1127 .unwrap();
1128
1129 for i in 0..100u64 {
1130 journal.append(i * 100).await.unwrap();
1131 }
1132
1133 assert_eq!(journal.size(), 100);
1134 assert_eq!(journal.oldest_retained_pos(), Some(0));
1135
1136 let pruned = journal.prune(100).await.unwrap();
1138 assert!(pruned);
1139
1140 assert_eq!(journal.size(), 100);
1142 assert_eq!(journal.oldest_retained_pos(), None);
1143
1144 for i in 0..100 {
1146 assert!(matches!(
1147 journal.read(i).await,
1148 Err(crate::journal::Error::ItemPruned(_))
1149 ));
1150 }
1151
1152 journal.sync().await.unwrap();
1153 drop(journal);
1154
1155 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1157 .await
1158 .unwrap();
1159
1160 assert_eq!(journal.size(), 100);
1162 assert_eq!(journal.oldest_retained_pos(), None);
1163
1164 for i in 0..100 {
1166 assert!(matches!(
1167 journal.read(i).await,
1168 Err(crate::journal::Error::ItemPruned(_))
1169 ));
1170 }
1171
1172 journal.append(10000).await.unwrap();
1175 assert_eq!(journal.size(), 101);
1176 assert_eq!(journal.oldest_retained_pos(), Some(100));
1178
1179 assert_eq!(journal.read(100).await.unwrap(), 10000);
1181
1182 assert!(matches!(
1184 journal.read(99).await,
1185 Err(crate::journal::Error::ItemPruned(_))
1186 ));
1187
1188 journal.destroy().await.unwrap();
1189 });
1190 }
1191
1192 #[test_traced]
1194 fn test_variable_recovery_prune_crash_offsets_behind() {
1195 let executor = deterministic::Runner::default();
1196 executor.start(|context| async move {
1197 let cfg = Config {
1199 partition: "recovery_prune_crash".to_string(),
1200 items_per_section: NZU64!(10),
1201 compression: None,
1202 codec_config: (),
1203 buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1204 write_buffer: NZUsize!(1024),
1205 };
1206
1207 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1208 .await
1209 .unwrap();
1210
1211 for i in 0..40u64 {
1213 variable.append(i * 100).await.unwrap();
1214 }
1215
1216 variable.prune(10).await.unwrap();
1218 assert_eq!(variable.oldest_retained_pos(), Some(10));
1219
1220 variable.data.prune(2).await.unwrap();
1223 variable.sync().await.unwrap();
1226 drop(variable);
1227
1228 let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1230 .await
1231 .unwrap();
1232
1233 assert_eq!(variable.oldest_retained_pos(), Some(20));
1235 assert_eq!(variable.size(), 40);
1236
1237 assert!(matches!(
1239 variable.read(10).await,
1240 Err(crate::journal::Error::ItemPruned(_))
1241 ));
1242
1243 assert_eq!(variable.read(20).await.unwrap(), 2000);
1245 assert_eq!(variable.read(39).await.unwrap(), 3900);
1246
1247 variable.destroy().await.unwrap();
1248 });
1249 }
1250
1251 #[test_traced]
1256 fn test_variable_recovery_offsets_ahead_corruption() {
1257 let executor = deterministic::Runner::default();
1258 executor.start(|context| async move {
1259 let cfg = Config {
1261 partition: "recovery_offsets_ahead".to_string(),
1262 items_per_section: NZU64!(10),
1263 compression: None,
1264 codec_config: (),
1265 buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1266 write_buffer: NZUsize!(1024),
1267 };
1268
1269 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1270 .await
1271 .unwrap();
1272
1273 for i in 0..40u64 {
1275 variable.append(i * 100).await.unwrap();
1276 }
1277
1278 variable.offsets.prune(20).await.unwrap(); variable.data.prune(1).await.unwrap(); variable.sync().await.unwrap();
1283 drop(variable);
1284
1285 let result = Journal::<_, u64>::init(context.clone(), cfg.clone()).await;
1287 assert!(matches!(result, Err(Error::Corruption(_))));
1288 });
1289 }
1290
1291 #[test_traced]
1293 fn test_variable_recovery_append_crash_offsets_behind() {
1294 let executor = deterministic::Runner::default();
1295 executor.start(|context| async move {
1296 let cfg = Config {
1298 partition: "recovery_append_crash".to_string(),
1299 items_per_section: NZU64!(10),
1300 compression: None,
1301 codec_config: (),
1302 buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1303 write_buffer: NZUsize!(1024),
1304 };
1305
1306 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1307 .await
1308 .unwrap();
1309
1310 for i in 0..15u64 {
1312 variable.append(i * 100).await.unwrap();
1313 }
1314
1315 assert_eq!(variable.size(), 15);
1316
1317 for i in 15..20u64 {
1319 variable.data.append(1, i * 100).await.unwrap();
1320 }
1321 variable.sync().await.unwrap();
1324 drop(variable);
1325
1326 let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1328 .await
1329 .unwrap();
1330
1331 assert_eq!(variable.size(), 20);
1333 assert_eq!(variable.oldest_retained_pos(), Some(0));
1334
1335 for i in 0..20u64 {
1337 assert_eq!(variable.read(i).await.unwrap(), i * 100);
1338 }
1339
1340 assert_eq!(variable.offsets.size(), 20);
1342
1343 variable.destroy().await.unwrap();
1344 });
1345 }
1346
1347 #[test_traced]
1349 fn test_variable_recovery_multiple_prunes_crash() {
1350 let executor = deterministic::Runner::default();
1351 executor.start(|context| async move {
1352 let cfg = Config {
1354 partition: "recovery_multiple_prunes".to_string(),
1355 items_per_section: NZU64!(10),
1356 compression: None,
1357 codec_config: (),
1358 buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1359 write_buffer: NZUsize!(1024),
1360 };
1361
1362 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1363 .await
1364 .unwrap();
1365
1366 for i in 0..50u64 {
1368 variable.append(i * 100).await.unwrap();
1369 }
1370
1371 variable.prune(10).await.unwrap();
1373 assert_eq!(variable.oldest_retained_pos(), Some(10));
1374
1375 variable.data.prune(3).await.unwrap();
1378 variable.sync().await.unwrap();
1381 drop(variable);
1382
1383 let variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1385 .await
1386 .unwrap();
1387
1388 assert_eq!(variable.oldest_retained_pos(), Some(30));
1390 assert_eq!(variable.size(), 50);
1391
1392 assert!(matches!(
1394 variable.read(10).await,
1395 Err(crate::journal::Error::ItemPruned(_))
1396 ));
1397 assert!(matches!(
1398 variable.read(20).await,
1399 Err(crate::journal::Error::ItemPruned(_))
1400 ));
1401
1402 assert_eq!(variable.read(30).await.unwrap(), 3000);
1404 assert_eq!(variable.read(49).await.unwrap(), 4900);
1405
1406 variable.destroy().await.unwrap();
1407 });
1408 }
1409
1410 #[test_traced]
1417 fn test_variable_recovery_rewind_crash_multi_section() {
1418 let executor = deterministic::Runner::default();
1419 executor.start(|context| async move {
1420 let cfg = Config {
1422 partition: "recovery_rewind_crash".to_string(),
1423 items_per_section: NZU64!(10),
1424 compression: None,
1425 codec_config: (),
1426 buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1427 write_buffer: NZUsize!(1024),
1428 };
1429
1430 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1431 .await
1432 .unwrap();
1433
1434 for i in 0..25u64 {
1436 variable.append(i * 100).await.unwrap();
1437 }
1438
1439 assert_eq!(variable.size(), 25);
1440
1441 variable.offsets.rewind(5).await.unwrap();
1444 variable.sync().await.unwrap();
1447 drop(variable);
1448
1449 let mut variable = Journal::<_, u64>::init(context.clone(), cfg.clone())
1451 .await
1452 .unwrap();
1453
1454 assert_eq!(variable.size(), 25);
1456 assert_eq!(variable.oldest_retained_pos(), Some(0));
1457
1458 for i in 0..25u64 {
1460 assert_eq!(variable.read(i).await.unwrap(), i * 100);
1461 }
1462
1463 assert_eq!(variable.offsets.size(), 25);
1465
1466 let pos = variable.append(2500).await.unwrap();
1468 assert_eq!(pos, 25);
1469 assert_eq!(variable.read(25).await.unwrap(), 2500);
1470
1471 variable.destroy().await.unwrap();
1472 });
1473 }
1474
1475 #[test_traced]
1478 fn test_variable_recovery_empty_offsets_after_prune_and_append() {
1479 let executor = deterministic::Runner::default();
1480 executor.start(|context| async move {
1481 let cfg = Config {
1482 partition: "recovery_empty_after_prune".to_string(),
1483 items_per_section: NZU64!(10),
1484 compression: None,
1485 codec_config: (),
1486 buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1487 write_buffer: NZUsize!(1024),
1488 };
1489
1490 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1492 .await
1493 .unwrap();
1494
1495 for i in 0..10u64 {
1497 journal.append(i * 100).await.unwrap();
1498 }
1499 assert_eq!(journal.size(), 10);
1500 assert_eq!(journal.oldest_retained_pos(), Some(0));
1501
1502 journal.prune(10).await.unwrap();
1504 assert_eq!(journal.size(), 10);
1505 assert_eq!(journal.oldest_retained_pos(), None); for i in 10..20u64 {
1511 journal.data.append(1, i * 100).await.unwrap();
1512 }
1513 journal.data.sync(1).await.unwrap();
1515 drop(journal);
1519
1520 let journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1522 .await
1523 .expect("Should recover from crash after data sync but before offsets sync");
1524
1525 assert_eq!(journal.size(), 20);
1527 assert_eq!(journal.oldest_retained_pos(), Some(10));
1528
1529 for i in 10..20u64 {
1531 assert_eq!(journal.read(i).await.unwrap(), i * 100);
1532 }
1533
1534 for i in 0..10 {
1536 assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1537 }
1538
1539 journal.destroy().await.unwrap();
1540 });
1541 }
1542
1543 #[test_traced]
1545 fn test_variable_concurrent_sync_recovery() {
1546 let executor = deterministic::Runner::default();
1547 executor.start(|context| async move {
1548 let cfg = Config {
1549 partition: "concurrent_sync_recovery".to_string(),
1550 items_per_section: NZU64!(10),
1551 compression: None,
1552 codec_config: (),
1553 buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
1554 write_buffer: NZUsize!(1024),
1555 };
1556
1557 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1558 .await
1559 .unwrap();
1560
1561 for i in 0..15u64 {
1563 journal.append(i * 100).await.unwrap();
1564 }
1565
1566 journal.commit().await.unwrap();
1568
1569 drop(journal);
1571
1572 let journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1573 .await
1574 .unwrap();
1575
1576 assert_eq!(journal.size(), 15);
1578 for i in 0..15u64 {
1579 assert_eq!(journal.read(i).await.unwrap(), i * 100);
1580 }
1581
1582 journal.destroy().await.unwrap();
1583 });
1584 }
1585
1586 #[test_traced]
1587 fn test_init_at_size_zero() {
1588 let executor = deterministic::Runner::default();
1589 executor.start(|context| async move {
1590 let cfg = Config {
1591 partition: "init_at_size_zero".to_string(),
1592 items_per_section: NZU64!(5),
1593 compression: None,
1594 codec_config: (),
1595 buffer_pool: PoolRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1596 write_buffer: NZUsize!(1024),
1597 };
1598
1599 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 0)
1600 .await
1601 .unwrap();
1602
1603 assert_eq!(journal.size(), 0);
1605
1606 assert_eq!(journal.oldest_retained_pos(), None);
1608
1609 let pos = journal.append(100).await.unwrap();
1611 assert_eq!(pos, 0);
1612 assert_eq!(journal.size(), 1);
1613 assert_eq!(journal.read(0).await.unwrap(), 100);
1614
1615 journal.destroy().await.unwrap();
1616 });
1617 }
1618
1619 #[test_traced]
1620 fn test_init_at_size_section_boundary() {
1621 let executor = deterministic::Runner::default();
1622 executor.start(|context| async move {
1623 let cfg = Config {
1624 partition: "init_at_size_boundary".to_string(),
1625 items_per_section: NZU64!(5),
1626 compression: None,
1627 codec_config: (),
1628 buffer_pool: PoolRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1629 write_buffer: NZUsize!(1024),
1630 };
1631
1632 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 10)
1634 .await
1635 .unwrap();
1636
1637 assert_eq!(journal.size(), 10);
1639
1640 assert_eq!(journal.oldest_retained_pos(), None);
1642
1643 let pos = journal.append(1000).await.unwrap();
1645 assert_eq!(pos, 10);
1646 assert_eq!(journal.size(), 11);
1647 assert_eq!(journal.read(10).await.unwrap(), 1000);
1648
1649 let pos = journal.append(1001).await.unwrap();
1651 assert_eq!(pos, 11);
1652 assert_eq!(journal.read(11).await.unwrap(), 1001);
1653
1654 journal.destroy().await.unwrap();
1655 });
1656 }
1657
1658 #[test_traced]
1659 fn test_init_at_size_mid_section() {
1660 let executor = deterministic::Runner::default();
1661 executor.start(|context| async move {
1662 let cfg = Config {
1663 partition: "init_at_size_mid".to_string(),
1664 items_per_section: NZU64!(5),
1665 compression: None,
1666 codec_config: (),
1667 buffer_pool: PoolRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1668 write_buffer: NZUsize!(1024),
1669 };
1670
1671 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 7)
1673 .await
1674 .unwrap();
1675
1676 assert_eq!(journal.size(), 7);
1678
1679 assert_eq!(journal.oldest_retained_pos(), None);
1681
1682 let pos = journal.append(700).await.unwrap();
1684 assert_eq!(pos, 7);
1685 assert_eq!(journal.size(), 8);
1686 assert_eq!(journal.read(7).await.unwrap(), 700);
1687
1688 journal.destroy().await.unwrap();
1689 });
1690 }
1691
1692 #[test_traced]
1693 fn test_init_at_size_persistence() {
1694 let executor = deterministic::Runner::default();
1695 executor.start(|context| async move {
1696 let cfg = Config {
1697 partition: "init_at_size_persist".to_string(),
1698 items_per_section: NZU64!(5),
1699 compression: None,
1700 codec_config: (),
1701 buffer_pool: PoolRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1702 write_buffer: NZUsize!(1024),
1703 };
1704
1705 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 15)
1707 .await
1708 .unwrap();
1709
1710 for i in 0..5u64 {
1712 let pos = journal.append(1500 + i).await.unwrap();
1713 assert_eq!(pos, 15 + i);
1714 }
1715
1716 assert_eq!(journal.size(), 20);
1717
1718 journal.sync().await.unwrap();
1720 drop(journal);
1721
1722 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
1723 .await
1724 .unwrap();
1725
1726 assert_eq!(journal.size(), 20);
1728 assert_eq!(journal.oldest_retained_pos(), Some(15));
1729
1730 for i in 0..5u64 {
1732 assert_eq!(journal.read(15 + i).await.unwrap(), 1500 + i);
1733 }
1734
1735 let pos = journal.append(9999).await.unwrap();
1737 assert_eq!(pos, 20);
1738 assert_eq!(journal.read(20).await.unwrap(), 9999);
1739
1740 journal.destroy().await.unwrap();
1741 });
1742 }
1743
1744 #[test_traced]
1745 fn test_init_at_size_large_offset() {
1746 let executor = deterministic::Runner::default();
1747 executor.start(|context| async move {
1748 let cfg = Config {
1749 partition: "init_at_size_large".to_string(),
1750 items_per_section: NZU64!(5),
1751 compression: None,
1752 codec_config: (),
1753 buffer_pool: PoolRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1754 write_buffer: NZUsize!(1024),
1755 };
1756
1757 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 1000)
1759 .await
1760 .unwrap();
1761
1762 assert_eq!(journal.size(), 1000);
1763 assert_eq!(journal.oldest_retained_pos(), None);
1765
1766 let pos = journal.append(100000).await.unwrap();
1768 assert_eq!(pos, 1000);
1769 assert_eq!(journal.read(1000).await.unwrap(), 100000);
1770
1771 journal.destroy().await.unwrap();
1772 });
1773 }
1774
1775 #[test_traced]
1776 fn test_init_at_size_prune_and_append() {
1777 let executor = deterministic::Runner::default();
1778 executor.start(|context| async move {
1779 let cfg = Config {
1780 partition: "init_at_size_prune".to_string(),
1781 items_per_section: NZU64!(5),
1782 compression: None,
1783 codec_config: (),
1784 buffer_pool: PoolRef::new(SMALL_PAGE_SIZE, NZUsize!(2)),
1785 write_buffer: NZUsize!(1024),
1786 };
1787
1788 let mut journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 20)
1790 .await
1791 .unwrap();
1792
1793 for i in 0..10u64 {
1795 journal.append(2000 + i).await.unwrap();
1796 }
1797
1798 assert_eq!(journal.size(), 30);
1799
1800 journal.prune(25).await.unwrap();
1802
1803 assert_eq!(journal.size(), 30);
1804 assert_eq!(journal.oldest_retained_pos(), Some(25));
1805
1806 for i in 25..30u64 {
1808 assert_eq!(journal.read(i).await.unwrap(), 2000 + (i - 20));
1809 }
1810
1811 let pos = journal.append(3000).await.unwrap();
1813 assert_eq!(pos, 30);
1814
1815 journal.destroy().await.unwrap();
1816 });
1817 }
1818
1819 #[test_traced]
1821 fn test_init_sync_no_existing_data() {
1822 let executor = deterministic::Runner::default();
1823 executor.start(|context| async move {
1824 let cfg = Config {
1825 partition: "test_fresh_start".into(),
1826 items_per_section: NZU64!(5),
1827 compression: None,
1828 codec_config: (),
1829 write_buffer: NZUsize!(1024),
1830 buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
1831 };
1832
1833 let lower_bound = 10;
1835 let upper_bound = 26;
1836 let mut journal =
1837 Journal::init_sync(context.clone(), cfg.clone(), lower_bound..upper_bound)
1838 .await
1839 .expect("Failed to initialize journal with sync boundaries");
1840
1841 assert_eq!(journal.size(), lower_bound);
1842 assert_eq!(journal.oldest_retained_pos(), None);
1843
1844 let pos1 = journal.append(42u64).await.unwrap();
1846 assert_eq!(pos1, lower_bound);
1847 assert_eq!(journal.read(pos1).await.unwrap(), 42u64);
1848
1849 let pos2 = journal.append(43u64).await.unwrap();
1850 assert_eq!(pos2, lower_bound + 1);
1851 assert_eq!(journal.read(pos2).await.unwrap(), 43u64);
1852
1853 journal.destroy().await.unwrap();
1854 });
1855 }
1856
1857 #[test_traced]
1859 fn test_init_sync_existing_data_overlap() {
1860 let executor = deterministic::Runner::default();
1861 executor.start(|context| async move {
1862 let cfg = Config {
1863 partition: "test_overlap".into(),
1864 items_per_section: NZU64!(5),
1865 compression: None,
1866 codec_config: (),
1867 write_buffer: NZUsize!(1024),
1868 buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
1869 };
1870
1871 let mut journal =
1873 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
1874 .await
1875 .expect("Failed to create initial journal");
1876
1877 for i in 0..20u64 {
1879 journal.append(i * 100).await.unwrap();
1880 }
1881 journal.sync().await.unwrap();
1882 drop(journal);
1883
1884 let lower_bound = 8;
1887 let upper_bound = 31;
1888 let mut journal = Journal::<deterministic::Context, u64>::init_sync(
1889 context.clone(),
1890 cfg.clone(),
1891 lower_bound..upper_bound,
1892 )
1893 .await
1894 .expect("Failed to initialize journal with overlap");
1895
1896 assert_eq!(journal.size(), 20);
1897
1898 let oldest = journal.oldest_retained_pos();
1900 assert_eq!(oldest, Some(5)); assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
1904 assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
1905
1906 assert_eq!(journal.read(5).await.unwrap(), 500);
1908 assert_eq!(journal.read(8).await.unwrap(), 800);
1909 assert_eq!(journal.read(19).await.unwrap(), 1900);
1910
1911 assert!(matches!(
1913 journal.read(20).await,
1914 Err(Error::ItemOutOfRange(_))
1915 ));
1916
1917 let pos = journal.append(999).await.unwrap();
1919 assert_eq!(pos, 20);
1920 assert_eq!(journal.read(20).await.unwrap(), 999);
1921
1922 journal.destroy().await.unwrap();
1923 });
1924 }
1925
1926 #[should_panic]
1928 #[test_traced]
1929 fn test_init_sync_invalid_parameters() {
1930 let executor = deterministic::Runner::default();
1931 executor.start(|context| async move {
1932 let cfg = Config {
1933 partition: "test_invalid".into(),
1934 items_per_section: NZU64!(5),
1935 compression: None,
1936 codec_config: (),
1937 write_buffer: NZUsize!(1024),
1938 buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
1939 };
1940
1941 #[allow(clippy::reversed_empty_ranges)]
1942 let _result = Journal::<deterministic::Context, u64>::init_sync(
1943 context.clone(),
1944 cfg,
1945 10..5, )
1947 .await;
1948 });
1949 }
1950
1951 #[test_traced]
1953 fn test_init_sync_existing_data_exact_match() {
1954 let executor = deterministic::Runner::default();
1955 executor.start(|context| async move {
1956 let items_per_section = NZU64!(5);
1957 let cfg = Config {
1958 partition: "test_exact_match".to_string(),
1959 items_per_section,
1960 compression: None,
1961 codec_config: (),
1962 write_buffer: NZUsize!(1024),
1963 buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
1964 };
1965
1966 let mut journal =
1968 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
1969 .await
1970 .expect("Failed to create initial journal");
1971
1972 for i in 0..20u64 {
1974 journal.append(i * 100).await.unwrap();
1975 }
1976 journal.sync().await.unwrap();
1977 drop(journal);
1978
1979 let lower_bound = 5; let upper_bound = 20; let mut journal = Journal::<deterministic::Context, u64>::init_sync(
1983 context.clone(),
1984 cfg.clone(),
1985 lower_bound..upper_bound,
1986 )
1987 .await
1988 .expect("Failed to initialize journal with exact match");
1989
1990 assert_eq!(journal.size(), 20);
1991
1992 let oldest = journal.oldest_retained_pos();
1994 assert_eq!(oldest, Some(5)); assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
1998 assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
1999
2000 assert_eq!(journal.read(5).await.unwrap(), 500);
2002 assert_eq!(journal.read(10).await.unwrap(), 1000);
2003 assert_eq!(journal.read(19).await.unwrap(), 1900);
2004
2005 assert!(matches!(
2007 journal.read(20).await,
2008 Err(Error::ItemOutOfRange(_))
2009 ));
2010
2011 let pos = journal.append(999).await.unwrap();
2013 assert_eq!(pos, 20);
2014 assert_eq!(journal.read(20).await.unwrap(), 999);
2015
2016 journal.destroy().await.unwrap();
2017 });
2018 }
2019
2020 #[test_traced]
2023 fn test_init_sync_existing_data_exceeds_upper_bound() {
2024 let executor = deterministic::Runner::default();
2025 executor.start(|context| async move {
2026 let items_per_section = NZU64!(5);
2027 let cfg = Config {
2028 partition: "test_unexpected_data".into(),
2029 items_per_section,
2030 compression: None,
2031 codec_config: (),
2032 write_buffer: NZUsize!(1024),
2033 buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2034 };
2035
2036 let mut journal =
2038 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2039 .await
2040 .expect("Failed to create initial journal");
2041
2042 for i in 0..30u64 {
2044 journal.append(i * 1000).await.unwrap();
2045 }
2046 journal.sync().await.unwrap();
2047 drop(journal);
2048
2049 let lower_bound = 8; for upper_bound in 9..29 {
2052 let result = Journal::<deterministic::Context, u64>::init_sync(
2053 context.clone(),
2054 cfg.clone(),
2055 lower_bound..upper_bound,
2056 )
2057 .await;
2058
2059 assert!(matches!(result, Err(crate::qmdb::Error::UnexpectedData(_))));
2061 }
2062 });
2063 }
2064
2065 #[test_traced]
2067 fn test_init_sync_existing_data_stale() {
2068 let executor = deterministic::Runner::default();
2069 executor.start(|context| async move {
2070 let items_per_section = NZU64!(5);
2071 let cfg = Config {
2072 partition: "test_stale".into(),
2073 items_per_section,
2074 compression: None,
2075 codec_config: (),
2076 write_buffer: NZUsize!(1024),
2077 buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2078 };
2079
2080 let mut journal =
2082 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2083 .await
2084 .expect("Failed to create initial journal");
2085
2086 for i in 0..10u64 {
2088 journal.append(i * 100).await.unwrap();
2089 }
2090 journal.sync().await.unwrap();
2091 drop(journal);
2092
2093 let lower_bound = 15; let upper_bound = 26; let journal = Journal::<deterministic::Context, u64>::init_sync(
2097 context.clone(),
2098 cfg.clone(),
2099 lower_bound..upper_bound,
2100 )
2101 .await
2102 .expect("Failed to initialize journal with stale data");
2103
2104 assert_eq!(journal.size(), 15);
2105
2106 assert_eq!(journal.oldest_retained_pos(), None);
2108
2109 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2111 assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2112 assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2113
2114 journal.destroy().await.unwrap();
2115 });
2116 }
2117
2118 #[test_traced]
2120 fn test_init_sync_section_boundaries() {
2121 let executor = deterministic::Runner::default();
2122 executor.start(|context| async move {
2123 let items_per_section = NZU64!(5);
2124 let cfg = Config {
2125 partition: "test_boundaries".into(),
2126 items_per_section,
2127 compression: None,
2128 codec_config: (),
2129 write_buffer: NZUsize!(1024),
2130 buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2131 };
2132
2133 let mut journal =
2135 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2136 .await
2137 .expect("Failed to create initial journal");
2138
2139 for i in 0..25u64 {
2141 journal.append(i * 100).await.unwrap();
2142 }
2143 journal.sync().await.unwrap();
2144 drop(journal);
2145
2146 let lower_bound = 15; let upper_bound = 25; let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2150 context.clone(),
2151 cfg.clone(),
2152 lower_bound..upper_bound,
2153 )
2154 .await
2155 .expect("Failed to initialize journal at boundaries");
2156
2157 assert_eq!(journal.size(), 25);
2158
2159 let oldest = journal.oldest_retained_pos();
2161 assert_eq!(oldest, Some(15));
2162
2163 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2165 assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2166
2167 assert_eq!(journal.read(15).await.unwrap(), 1500);
2169 assert_eq!(journal.read(20).await.unwrap(), 2000);
2170 assert_eq!(journal.read(24).await.unwrap(), 2400);
2171
2172 assert!(matches!(
2174 journal.read(25).await,
2175 Err(Error::ItemOutOfRange(_))
2176 ));
2177
2178 let pos = journal.append(999).await.unwrap();
2180 assert_eq!(pos, 25);
2181 assert_eq!(journal.read(25).await.unwrap(), 999);
2182
2183 journal.destroy().await.unwrap();
2184 });
2185 }
2186
2187 #[test_traced]
2189 fn test_init_sync_same_section_bounds() {
2190 let executor = deterministic::Runner::default();
2191 executor.start(|context| async move {
2192 let items_per_section = NZU64!(5);
2193 let cfg = Config {
2194 partition: "test_same_section".into(),
2195 items_per_section,
2196 compression: None,
2197 codec_config: (),
2198 write_buffer: NZUsize!(1024),
2199 buffer_pool: PoolRef::new(PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2200 };
2201
2202 let mut journal =
2204 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2205 .await
2206 .expect("Failed to create initial journal");
2207
2208 for i in 0..15u64 {
2210 journal.append(i * 100).await.unwrap();
2211 }
2212 journal.sync().await.unwrap();
2213 drop(journal);
2214
2215 let lower_bound = 10; let upper_bound = 15; let mut journal = Journal::<deterministic::Context, u64>::init_sync(
2219 context.clone(),
2220 cfg.clone(),
2221 lower_bound..upper_bound,
2222 )
2223 .await
2224 .expect("Failed to initialize journal with same-section bounds");
2225
2226 assert_eq!(journal.size(), 15);
2227
2228 let oldest = journal.oldest_retained_pos();
2231 assert_eq!(oldest, Some(10));
2232
2233 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2235 assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2236
2237 assert_eq!(journal.read(10).await.unwrap(), 1000);
2239 assert_eq!(journal.read(11).await.unwrap(), 1100);
2240 assert_eq!(journal.read(14).await.unwrap(), 1400);
2241
2242 assert!(matches!(
2244 journal.read(15).await,
2245 Err(Error::ItemOutOfRange(_))
2246 ));
2247
2248 let pos = journal.append(999).await.unwrap();
2250 assert_eq!(pos, 15);
2251 assert_eq!(journal.read(15).await.unwrap(), 999);
2252
2253 journal.destroy().await.unwrap();
2254 });
2255 }
2256
2257 #[test_traced]
2262 fn test_single_item_per_section() {
2263 let executor = deterministic::Runner::default();
2264 executor.start(|context| async move {
2265 let cfg = Config {
2266 partition: "single_item_per_section".to_string(),
2267 items_per_section: NZU64!(1),
2268 compression: None,
2269 codec_config: (),
2270 buffer_pool: PoolRef::new(LARGE_PAGE_SIZE, NZUsize!(10)),
2271 write_buffer: NZUsize!(1024),
2272 };
2273
2274 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
2276 .await
2277 .unwrap();
2278
2279 assert_eq!(journal.size(), 0);
2281 assert_eq!(journal.oldest_retained_pos(), None);
2282
2283 let pos = journal.append(0).await.unwrap();
2285 assert_eq!(pos, 0);
2286 assert_eq!(journal.size(), 1);
2287
2288 journal.sync().await.unwrap();
2290
2291 let value = journal.read(journal.size() - 1).await.unwrap();
2293 assert_eq!(value, 0);
2294
2295 for i in 1..10u64 {
2297 let pos = journal.append(i * 100).await.unwrap();
2298 assert_eq!(pos, i);
2299 assert_eq!(journal.size(), i + 1);
2300
2301 let value = journal.read(journal.size() - 1).await.unwrap();
2303 assert_eq!(value, i * 100);
2304 }
2305
2306 for i in 0..10u64 {
2308 assert_eq!(journal.read(i).await.unwrap(), i * 100);
2309 }
2310
2311 journal.sync().await.unwrap();
2312
2313 let pruned = journal.prune(5).await.unwrap();
2316 assert!(pruned);
2317
2318 assert_eq!(journal.size(), 10);
2320
2321 assert_eq!(journal.oldest_retained_pos(), Some(5));
2323
2324 let value = journal.read(journal.size() - 1).await.unwrap();
2326 assert_eq!(value, 900);
2327
2328 for i in 0..5 {
2330 assert!(matches!(
2331 journal.read(i).await,
2332 Err(crate::journal::Error::ItemPruned(_))
2333 ));
2334 }
2335
2336 for i in 5..10u64 {
2338 assert_eq!(journal.read(i).await.unwrap(), i * 100);
2339 }
2340
2341 for i in 10..15u64 {
2343 let pos = journal.append(i * 100).await.unwrap();
2344 assert_eq!(pos, i);
2345
2346 let value = journal.read(journal.size() - 1).await.unwrap();
2348 assert_eq!(value, i * 100);
2349 }
2350
2351 journal.sync().await.unwrap();
2352 drop(journal);
2353
2354 let journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
2356 .await
2357 .unwrap();
2358
2359 assert_eq!(journal.size(), 15);
2361
2362 assert_eq!(journal.oldest_retained_pos(), Some(5));
2364
2365 let value = journal.read(journal.size() - 1).await.unwrap();
2367 assert_eq!(value, 1400);
2368
2369 for i in 5..15u64 {
2371 assert_eq!(journal.read(i).await.unwrap(), i * 100);
2372 }
2373
2374 journal.destroy().await.unwrap();
2375
2376 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
2379 .await
2380 .unwrap();
2381
2382 for i in 0..10u64 {
2384 journal.append(i * 1000).await.unwrap();
2385 }
2386
2387 journal.prune(5).await.unwrap();
2389 assert_eq!(journal.size(), 10);
2390 assert_eq!(journal.oldest_retained_pos(), Some(5));
2391
2392 journal.sync().await.unwrap();
2394 drop(journal);
2395
2396 let journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
2398 .await
2399 .unwrap();
2400
2401 assert_eq!(journal.size(), 10);
2403 assert_eq!(journal.oldest_retained_pos(), Some(5));
2404
2405 let value = journal.read(journal.size() - 1).await.unwrap();
2407 assert_eq!(value, 9000);
2408
2409 for i in 5..10u64 {
2411 assert_eq!(journal.read(i).await.unwrap(), i * 1000);
2412 }
2413
2414 journal.destroy().await.unwrap();
2415
2416 let mut journal = Journal::<_, u64>::init(context.clone(), cfg.clone())
2420 .await
2421 .unwrap();
2422
2423 for i in 0..5u64 {
2424 journal.append(i * 100).await.unwrap();
2425 }
2426 journal.sync().await.unwrap();
2427
2428 journal.prune(5).await.unwrap();
2430 assert_eq!(journal.size(), 5); assert_eq!(journal.oldest_retained_pos(), None); let result = journal.read(journal.size() - 1).await;
2435 assert!(matches!(result, Err(crate::journal::Error::ItemPruned(4))));
2436
2437 journal.append(500).await.unwrap();
2439 assert_eq!(journal.oldest_retained_pos(), Some(5));
2440 assert_eq!(journal.read(journal.size() - 1).await.unwrap(), 500);
2441
2442 journal.destroy().await.unwrap();
2443 });
2444 }
2445}