1use super::Reader as _;
7use crate::{
8 journal::{
9 contiguous::{fixed, Contiguous, Mutable},
10 segmented::variable,
11 Error,
12 },
13 Persistable,
14};
15use commonware_codec::{Codec, CodecShared};
16use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
17use commonware_utils::{
18 sync::{AsyncRwLockReadGuard, UpgradableAsyncRwLock},
19 NZUsize,
20};
21#[commonware_macros::stability(ALPHA)]
22use core::ops::Range;
23use futures::{stream, Stream, StreamExt as _};
24use std::num::{NonZeroU64, NonZeroUsize};
25#[commonware_macros::stability(ALPHA)]
26use tracing::debug;
27use tracing::warn;
28
29const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1024);
30
31const DATA_SUFFIX: &str = "_data";
33
34const OFFSETS_SUFFIX: &str = "_offsets";
36
37const fn position_to_section(position: u64, items_per_section: u64) -> u64 {
59 position / items_per_section
60}
61
62#[derive(Clone)]
64pub struct Config<C> {
65 pub partition: String,
67
68 pub items_per_section: NonZeroU64,
73
74 pub compression: Option<u8>,
76
77 pub codec_config: C,
79
80 pub page_cache: CacheRef,
82
83 pub write_buffer: NonZeroUsize,
85}
86
87impl<C> Config<C> {
88 fn data_partition(&self) -> String {
90 format!("{}{}", self.partition, DATA_SUFFIX)
91 }
92
93 fn offsets_partition(&self) -> String {
95 format!("{}{}", self.partition, OFFSETS_SUFFIX)
96 }
97}
98
99struct Inner<E: Clock + Storage + Metrics, V: Codec> {
101 data: variable::Journal<E, V>,
103
104 size: u64,
110
111 pruning_boundary: u64,
120}
121
122impl<E: Clock + Storage + Metrics, V: CodecShared> Inner<E, V> {
123 async fn read(
131 &self,
132 position: u64,
133 items_per_section: u64,
134 offsets: &impl super::Reader<Item = u64>,
135 ) -> Result<V, Error> {
136 if position >= self.size {
137 return Err(Error::ItemOutOfRange(position));
138 }
139 if position < self.pruning_boundary {
140 return Err(Error::ItemPruned(position));
141 }
142
143 let offset = offsets.read(position).await?;
144 let section = position_to_section(position, items_per_section);
145
146 self.data.get(section, offset).await
147 }
148}
149
150pub struct Journal<E: Clock + Storage + Metrics, V: Codec> {
188 inner: UpgradableAsyncRwLock<Inner<E, V>>,
193
194 offsets: fixed::Journal<E, u64>,
197
198 items_per_section: u64,
205}
206
207pub struct Reader<'a, E: Clock + Storage + Metrics, V: Codec> {
209 guard: AsyncRwLockReadGuard<'a, Inner<E, V>>,
210 offsets: fixed::Reader<'a, E, u64>,
211 items_per_section: u64,
212}
213
214impl<E: Clock + Storage + Metrics, V: CodecShared> super::Reader for Reader<'_, E, V> {
215 type Item = V;
216
217 fn bounds(&self) -> std::ops::Range<u64> {
218 self.guard.pruning_boundary..self.guard.size
219 }
220
221 async fn read(&self, position: u64) -> Result<V, Error> {
222 self.guard
223 .read(position, self.items_per_section, &self.offsets)
224 .await
225 }
226
227 async fn replay(
228 &self,
229 buffer_size: NonZeroUsize,
230 start_pos: u64,
231 ) -> Result<impl Stream<Item = Result<(u64, V), Error>> + Send, Error> {
232 if start_pos < self.guard.pruning_boundary {
234 return Err(Error::ItemPruned(start_pos));
235 }
236 if start_pos > self.guard.size {
237 return Err(Error::ItemOutOfRange(start_pos));
238 }
239
240 let (start_section, start_offset) = if start_pos < self.guard.size {
243 let offset = self.offsets.read(start_pos).await?;
244 let section = position_to_section(start_pos, self.items_per_section);
245 (section, offset)
246 } else {
247 (u64::MAX, 0)
248 };
249
250 let inner_stream = self
251 .guard
252 .data
253 .replay(start_section, start_offset, buffer_size)
254 .await?;
255
256 let stream = inner_stream
258 .zip(stream::iter(start_pos..))
259 .map(|(result, pos)| result.map(|(_section, _offset, _size, item)| (pos, item)));
260
261 Ok(stream)
262 }
263}
264
265impl<E: Clock + Storage + Metrics, V: CodecShared> Journal<E, V> {
266 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
273 let items_per_section = cfg.items_per_section.get();
274 let data_partition = cfg.data_partition();
275 let offsets_partition = cfg.offsets_partition();
276
277 let mut data = variable::Journal::init(
279 context.with_label("data"),
280 variable::Config {
281 partition: data_partition,
282 compression: cfg.compression,
283 codec_config: cfg.codec_config,
284 page_cache: cfg.page_cache.clone(),
285 write_buffer: cfg.write_buffer,
286 },
287 )
288 .await?;
289
290 let mut offsets = fixed::Journal::init(
292 context.with_label("offsets"),
293 fixed::Config {
294 partition: offsets_partition,
295 items_per_blob: cfg.items_per_section,
296 page_cache: cfg.page_cache,
297 write_buffer: cfg.write_buffer,
298 },
299 )
300 .await?;
301
302 let (pruning_boundary, size) =
304 Self::align_journals(&mut data, &mut offsets, items_per_section).await?;
305
306 Ok(Self {
307 inner: UpgradableAsyncRwLock::new(Inner {
308 data,
309 size,
310 pruning_boundary,
311 }),
312 offsets,
313 items_per_section,
314 })
315 }
316
317 #[commonware_macros::stability(ALPHA)]
322 pub async fn init_at_size(context: E, cfg: Config<V::Cfg>, size: u64) -> Result<Self, Error> {
323 let data = variable::Journal::init(
325 context.with_label("data"),
326 variable::Config {
327 partition: cfg.data_partition(),
328 compression: cfg.compression,
329 codec_config: cfg.codec_config.clone(),
330 page_cache: cfg.page_cache.clone(),
331 write_buffer: cfg.write_buffer,
332 },
333 )
334 .await?;
335
336 let offsets = fixed::Journal::init_at_size(
338 context.with_label("offsets"),
339 fixed::Config {
340 partition: cfg.offsets_partition(),
341 items_per_blob: cfg.items_per_section,
342 page_cache: cfg.page_cache,
343 write_buffer: cfg.write_buffer,
344 },
345 size,
346 )
347 .await?;
348
349 Ok(Self {
350 inner: UpgradableAsyncRwLock::new(Inner {
351 data,
352 size,
353 pruning_boundary: size,
354 }),
355 offsets,
356 items_per_section: cfg.items_per_section.get(),
357 })
358 }
359
360 #[commonware_macros::stability(ALPHA)]
386 pub(crate) async fn init_sync(
387 context: E,
388 cfg: Config<V::Cfg>,
389 range: Range<u64>,
390 ) -> Result<Self, Error> {
391 assert!(!range.is_empty(), "range must not be empty");
392
393 debug!(
394 range.start,
395 range.end,
396 items_per_section = cfg.items_per_section.get(),
397 "initializing contiguous variable journal for sync"
398 );
399
400 let journal = Self::init(context.with_label("journal"), cfg.clone()).await?;
402
403 let size = journal.size().await;
404
405 if size == 0 {
407 if range.start == 0 {
408 debug!("no existing journal data, returning empty journal");
409 return Ok(journal);
410 } else {
411 debug!(
412 range.start,
413 "no existing journal data, initializing at sync range start"
414 );
415 journal.destroy().await?;
416 return Self::init_at_size(context, cfg, range.start).await;
417 }
418 }
419
420 if size > range.end {
422 return Err(Error::ItemOutOfRange(size));
423 }
424
425 if size <= range.start {
427 debug!(
429 size,
430 range.start, "existing journal data is stale, re-initializing at start position"
431 );
432 journal.destroy().await?;
433 return Self::init_at_size(context, cfg, range.start).await;
434 }
435
436 let bounds = journal.reader().await.bounds();
438 if !bounds.is_empty() && bounds.start < range.start {
439 debug!(
440 oldest_pos = bounds.start,
441 range.start, "pruning journal to sync range start"
442 );
443 journal.prune(range.start).await?;
444 }
445
446 Ok(journal)
447 }
448
449 pub async fn rewind(&self, size: u64) -> Result<(), Error> {
463 let mut inner = self.inner.write().await;
464
465 match size.cmp(&inner.size) {
467 std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
468 std::cmp::Ordering::Equal => return Ok(()), std::cmp::Ordering::Less => {}
470 }
471
472 if size < inner.pruning_boundary {
474 return Err(Error::ItemPruned(size));
475 }
476
477 let discard_offset = {
479 let offsets_reader = self.offsets.reader().await;
480 offsets_reader.read(size).await?
481 };
482 let discard_section = position_to_section(size, self.items_per_section);
483
484 inner
485 .data
486 .rewind_to_offset(discard_section, discard_offset)
487 .await?;
488 self.offsets.rewind(size).await?;
489
490 inner.size = size;
492
493 Ok(())
494 }
495
496 pub async fn append(&self, item: &V) -> Result<u64, Error> {
512 let mut inner = self.inner.write().await;
514
515 let section = position_to_section(inner.size, self.items_per_section);
517
518 let (offset, _size) = inner.data.append(section, item).await?;
520
521 let offsets_pos = self.offsets.append(&offset).await?;
523 assert_eq!(offsets_pos, inner.size);
524
525 let position = inner.size;
527 inner.size += 1;
528
529 if !inner.size.is_multiple_of(self.items_per_section) {
531 return Ok(position);
532 }
533
534 let inner = inner.downgrade_to_upgradable();
537 futures::try_join!(inner.data.sync(section), self.offsets.sync())?;
538
539 Ok(position)
540 }
541
542 pub async fn reader(&self) -> Reader<'_, E, V> {
544 Reader {
545 guard: self.inner.read().await,
546 offsets: self.offsets.reader().await,
547 items_per_section: self.items_per_section,
548 }
549 }
550
551 pub async fn size(&self) -> u64 {
554 self.inner.read().await.size
555 }
556
557 pub async fn prune(&self, min_position: u64) -> Result<bool, Error> {
568 let mut inner = self.inner.write().await;
569
570 if min_position <= inner.pruning_boundary {
571 return Ok(false);
572 }
573
574 let min_position = min_position.min(inner.size);
576
577 let min_section = position_to_section(min_position, self.items_per_section);
579
580 let pruned = inner.data.prune(min_section).await?;
581 if pruned {
582 let new_oldest = (min_section * self.items_per_section).max(inner.pruning_boundary);
583 inner.pruning_boundary = new_oldest;
584 self.offsets.prune(new_oldest).await?;
585 }
586 Ok(pruned)
587 }
588
589 pub async fn commit(&self) -> Result<(), Error> {
594 let inner = self.inner.upgradable_read().await;
597
598 let section = position_to_section(inner.size, self.items_per_section);
599 inner.data.sync(section).await
600 }
601
602 pub async fn sync(&self) -> Result<(), Error> {
606 let inner = self.inner.upgradable_read().await;
609
610 let section = position_to_section(inner.size, self.items_per_section);
613
614 futures::try_join!(inner.data.sync(section), self.offsets.sync())?;
617
618 Ok(())
619 }
620
621 pub async fn destroy(self) -> Result<(), Error> {
625 let inner = self.inner.into_inner();
626 inner.data.destroy().await?;
627 self.offsets.destroy().await
628 }
629
630 #[commonware_macros::stability(ALPHA)]
635 pub(crate) async fn clear_to_size(&self, new_size: u64) -> Result<(), Error> {
636 let mut inner = self.inner.write().await;
637 inner.data.clear().await?;
638
639 self.offsets.clear_to_size(new_size).await?;
640 inner.size = new_size;
641 inner.pruning_boundary = new_size;
642 Ok(())
643 }
644
645 async fn align_journals(
655 data: &mut variable::Journal<E, V>,
656 offsets: &mut fixed::Journal<E, u64>,
657 items_per_section: u64,
658 ) -> Result<(u64, u64), Error> {
659 let items_in_last_section = match data.newest_section() {
661 Some(last_section) => {
662 let stream = data.replay(last_section, 0, REPLAY_BUFFER_SIZE).await?;
663 futures::pin_mut!(stream);
664 let mut count = 0u64;
665 while let Some(result) = stream.next().await {
666 result?; count += 1;
668 }
669 count
670 }
671 None => 0,
672 };
673
674 let data_empty =
678 data.is_empty() || (data.num_sections() == 1 && items_in_last_section == 0);
679 if data_empty {
680 let offsets_bounds = {
681 let offsets_reader = offsets.reader().await;
682 offsets_reader.bounds()
683 };
684 let size = offsets_bounds.end;
685
686 if !data.is_empty() {
687 let data_first_section = data.oldest_section().unwrap();
693 let data_section_start = data_first_section * items_per_section;
694 let target_pos = data_section_start.max(offsets_bounds.start);
695
696 warn!("crash repair: clearing offsets to {target_pos} (empty section crash)");
697 offsets.clear_to_size(target_pos).await?;
698 return Ok((target_pos, target_pos));
699 }
700
701 if !offsets_bounds.is_empty() && offsets_bounds.start < size {
706 warn!("crash repair: clearing offsets to {size} (prune-all crash)");
710 offsets.clear_to_size(size).await?;
711 }
712
713 return Ok((size, size));
714 }
715
716 let data_first_section = data.oldest_section().unwrap();
718 let data_last_section = data.newest_section().unwrap();
719
720 let data_oldest_pos = data_first_section * items_per_section;
723
724 {
727 let offsets_bounds = {
728 let offsets_reader = offsets.reader().await;
729 offsets_reader.bounds()
730 };
731 match (
732 offsets_bounds.is_empty(),
733 offsets_bounds.start.cmp(&data_oldest_pos),
734 ) {
735 (true, _) => {
736 let offsets_first_section = offsets_bounds.start / items_per_section;
739 if offsets_first_section != data_first_section {
740 return Err(Error::Corruption(format!(
741 "offsets journal empty at section {offsets_first_section} != data section {data_first_section}"
742 )));
743 }
744 warn!(
745 "crash repair: offsets journal empty at {}, will rebuild from data",
746 offsets_bounds.start
747 );
748 }
749 (false, std::cmp::Ordering::Less) => {
750 warn!("crash repair: pruning offsets journal to {data_oldest_pos}");
752 offsets.prune(data_oldest_pos).await?;
753 }
754 (false, std::cmp::Ordering::Greater) => {
755 if offsets_bounds.start / items_per_section > data_first_section {
757 return Err(Error::Corruption(format!(
758 "offsets oldest pos ({}) > data oldest pos ({data_oldest_pos})",
759 offsets_bounds.start
760 )));
761 }
762 }
763 (false, std::cmp::Ordering::Equal) => {
764 }
766 }
767 }
768
769 let (offsets_bounds, data_size) = {
777 let offsets_reader = offsets.reader().await;
778 let offsets_bounds = offsets_reader.bounds();
779 let data_size = if data_first_section == data_last_section {
780 offsets_bounds.start + items_in_last_section
781 } else {
782 let oldest_items =
783 (data_first_section + 1) * items_per_section - offsets_bounds.start;
784 let middle_items = (data_last_section - data_first_section - 1) * items_per_section;
785 offsets_bounds.start + oldest_items + middle_items + items_in_last_section
786 };
787 (offsets_bounds, data_size)
788 };
789
790 let offsets_size = offsets_bounds.end;
792 if offsets_size > data_size {
793 warn!("crash repair: rewinding offsets from {offsets_size} to {data_size}");
795 offsets.rewind(data_size).await?;
796 } else if offsets_size < data_size {
797 Self::add_missing_offsets(data, offsets, offsets_size, items_per_section).await?;
799 }
800
801 let pruning_boundary = {
803 let offsets_reader = offsets.reader().await;
804 let offsets_bounds = offsets_reader.bounds();
805 assert_eq!(offsets_bounds.end, data_size);
806
807 assert!(
810 !offsets_bounds.is_empty(),
811 "offsets should have data after alignment"
812 );
813 assert_eq!(
814 offsets_bounds.start / items_per_section,
815 data_first_section,
816 "offsets and data should be in same oldest section"
817 );
818 offsets_bounds.start
819 };
820
821 offsets.sync().await?;
822 Ok((pruning_boundary, data_size))
823 }
824
825 async fn add_missing_offsets(
836 data: &variable::Journal<E, V>,
837 offsets: &mut fixed::Journal<E, u64>,
838 offsets_size: u64,
839 items_per_section: u64,
840 ) -> Result<(), Error> {
841 assert!(
842 !data.is_empty(),
843 "rebuild_offsets called with empty data journal"
844 );
845
846 let (start_section, resume_offset, skip_first) = {
848 let offsets_reader = offsets.reader().await;
849 let offsets_bounds = offsets_reader.bounds();
850 if offsets_bounds.is_empty() {
851 let first_section = data.oldest_section().unwrap();
854 (first_section, 0, false)
855 } else if offsets_bounds.start < offsets_size {
856 let last_offset = offsets_reader.read(offsets_size - 1).await?;
858 let last_section = position_to_section(offsets_size - 1, items_per_section);
859 (last_section, last_offset, true)
860 } else {
861 let first_section = data.oldest_section().unwrap();
864 (first_section, 0, false)
865 }
866 };
867
868 let stream = data
872 .replay(start_section, resume_offset, REPLAY_BUFFER_SIZE)
873 .await?;
874 futures::pin_mut!(stream);
875
876 let mut skipped_first = false;
877 while let Some(result) = stream.next().await {
878 let (_section, offset, _size, _item) = result?;
879
880 if skip_first && !skipped_first {
882 skipped_first = true;
883 continue;
884 }
885
886 offsets.append(&offset).await?;
887 }
888
889 Ok(())
890 }
891}
892
893impl<E: Clock + Storage + Metrics, V: CodecShared> Contiguous for Journal<E, V> {
895 type Item = V;
896
897 async fn reader(&self) -> impl super::Reader<Item = V> + '_ {
898 Self::reader(self).await
899 }
900
901 async fn size(&self) -> u64 {
902 Self::size(self).await
903 }
904}
905
906impl<E: Clock + Storage + Metrics, V: CodecShared> Mutable for Journal<E, V> {
907 async fn append(&mut self, item: &Self::Item) -> Result<u64, Error> {
908 Self::append(self, item).await
909 }
910
911 async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
912 Self::prune(self, min_position).await
913 }
914
915 async fn rewind(&mut self, size: u64) -> Result<(), Error> {
916 Self::rewind(self, size).await
917 }
918}
919
920impl<E: Clock + Storage + Metrics, V: CodecShared> Persistable for Journal<E, V> {
921 type Error = Error;
922
923 async fn commit(&self) -> Result<(), Error> {
924 self.commit().await
925 }
926
927 async fn sync(&self) -> Result<(), Error> {
928 self.sync().await
929 }
930
931 async fn destroy(self) -> Result<(), Error> {
932 self.destroy().await
933 }
934}
935
936#[cfg(test)]
937impl<E: Clock + Storage + Metrics, V: CodecShared> Journal<E, V> {
938 pub(crate) async fn read(&self, position: u64) -> Result<V, Error> {
940 self.reader().await.read(position).await
941 }
942
943 pub(crate) async fn bounds(&self) -> std::ops::Range<u64> {
945 self.reader().await.bounds()
946 }
947
948 pub(crate) async fn test_prune_data(&self, section: u64) -> Result<bool, Error> {
950 let mut inner = self.inner.write().await;
951 inner.data.prune(section).await
952 }
953
954 pub(crate) async fn test_prune_offsets(&self, position: u64) -> Result<bool, Error> {
956 self.offsets.prune(position).await
957 }
958
959 pub(crate) async fn test_rewind_offsets(&self, position: u64) -> Result<(), Error> {
961 self.offsets.rewind(position).await
962 }
963
964 pub(crate) async fn test_offsets_size(&self) -> u64 {
966 self.offsets.size().await
967 }
968
969 pub(crate) async fn test_append_data(
971 &self,
972 section: u64,
973 item: V,
974 ) -> Result<(u64, u32), Error> {
975 let mut inner = self.inner.write().await;
976 inner.data.append(section, &item).await
977 }
978
979 pub(crate) async fn test_sync_data(&self) -> Result<(), Error> {
981 let inner = self.inner.read().await;
982 inner
983 .data
984 .sync(inner.data.newest_section().unwrap_or(0))
985 .await
986 }
987}
988
989#[cfg(test)]
990mod tests {
991 use super::*;
992 use crate::journal::contiguous::tests::run_contiguous_tests;
993 use commonware_macros::test_traced;
994 use commonware_runtime::{buffer::paged::CacheRef, deterministic, Metrics, Runner};
995 use commonware_utils::{NZUsize, NZU16, NZU64};
996 use futures::FutureExt as _;
997 use std::num::NonZeroU16;
998
999 const PAGE_SIZE: NonZeroU16 = NZU16!(101);
1001 const PAGE_CACHE_SIZE: usize = 2;
1002 const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(1024);
1004 const SMALL_PAGE_SIZE: NonZeroU16 = NZU16!(512);
1005
1006 #[test_traced]
1012 fn test_variable_offsets_partition_loss_after_prune_unrecoverable() {
1013 let executor = deterministic::Runner::default();
1014 executor.start(|context| async move {
1015 let cfg = Config {
1016 partition: "offsets-loss-after-prune".into(),
1017 items_per_section: NZU64!(10),
1018 compression: None,
1019 codec_config: (),
1020 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1021 write_buffer: NZUsize!(1024),
1022 };
1023
1024 let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1026 .await
1027 .unwrap();
1028
1029 for i in 0..40u64 {
1031 journal.append(&(i * 100)).await.unwrap();
1032 }
1033
1034 journal.prune(20).await.unwrap();
1036 let bounds = journal.bounds().await;
1037 assert_eq!(bounds.start, 20);
1038 assert_eq!(bounds.end, 40);
1039
1040 journal.sync().await.unwrap();
1041 drop(journal);
1042
1043 context
1046 .remove(&format!("{}-blobs", cfg.offsets_partition()), None)
1047 .await
1048 .expect("Failed to remove offsets blobs partition");
1049 context
1050 .remove(&format!("{}-metadata", cfg.offsets_partition()), None)
1051 .await
1052 .expect("Failed to remove offsets metadata partition");
1053
1054 let result = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()).await;
1056 assert!(matches!(result, Err(Error::Corruption(_))));
1057 });
1058 }
1059
1060 #[test_traced]
1068 fn test_variable_align_data_offsets_mismatch() {
1069 let executor = deterministic::Runner::default();
1070 executor.start(|context| async move {
1071 let cfg = Config {
1072 partition: "data-loss-test".into(),
1073 items_per_section: NZU64!(10),
1074 compression: None,
1075 codec_config: (),
1076 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1077 write_buffer: NZUsize!(1024),
1078 };
1079
1080 let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1082 .await
1083 .unwrap();
1084
1085 for i in 0..20u64 {
1087 variable.append(&(i * 100)).await.unwrap();
1088 }
1089
1090 variable.sync().await.unwrap();
1091 drop(variable);
1092
1093 context
1095 .remove(&cfg.data_partition(), None)
1096 .await
1097 .expect("Failed to remove data partition");
1098
1099 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1101 .await
1102 .expect("Should align offsets to match empty data");
1103
1104 assert_eq!(journal.size().await, 20);
1106
1107 assert!(journal.bounds().await.is_empty());
1109
1110 for i in 0..20 {
1112 assert!(matches!(
1113 journal.read(i).await,
1114 Err(crate::journal::Error::ItemPruned(_))
1115 ));
1116 }
1117
1118 let pos = journal.append(&999).await.unwrap();
1120 assert_eq!(pos, 20);
1121 assert_eq!(journal.read(20).await.unwrap(), 999);
1122
1123 journal.destroy().await.unwrap();
1124 });
1125 }
1126
1127 #[test_traced]
1129 fn test_variable_replay() {
1130 let executor = deterministic::Runner::default();
1131 executor.start(|context| async move {
1132 let cfg = Config {
1133 partition: "replay".into(),
1134 items_per_section: NZU64!(10),
1135 compression: None,
1136 codec_config: (),
1137 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1138 write_buffer: NZUsize!(1024),
1139 };
1140
1141 let journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1143
1144 for i in 0..40u64 {
1146 journal.append(&(i * 100)).await.unwrap();
1147 }
1148
1149 {
1151 let reader = journal.reader().await;
1152 let stream = reader.replay(NZUsize!(20), 0).await.unwrap();
1153 futures::pin_mut!(stream);
1154 for i in 0..40u64 {
1155 let (pos, item) = stream.next().await.unwrap().unwrap();
1156 assert_eq!(pos, i);
1157 assert_eq!(item, i * 100);
1158 }
1159 assert!(stream.next().await.is_none());
1160 }
1161
1162 {
1164 let reader = journal.reader().await;
1165 let stream = reader.replay(NZUsize!(20), 15).await.unwrap();
1166 futures::pin_mut!(stream);
1167 for i in 15..40u64 {
1168 let (pos, item) = stream.next().await.unwrap().unwrap();
1169 assert_eq!(pos, i);
1170 assert_eq!(item, i * 100);
1171 }
1172 assert!(stream.next().await.is_none());
1173 }
1174
1175 {
1177 let reader = journal.reader().await;
1178 let stream = reader.replay(NZUsize!(20), 20).await.unwrap();
1179 futures::pin_mut!(stream);
1180 for i in 20..40u64 {
1181 let (pos, item) = stream.next().await.unwrap().unwrap();
1182 assert_eq!(pos, i);
1183 assert_eq!(item, i * 100);
1184 }
1185 assert!(stream.next().await.is_none());
1186 }
1187
1188 journal.prune(20).await.unwrap();
1190 {
1191 let reader = journal.reader().await;
1192 let res = reader.replay(NZUsize!(20), 0).await;
1193 assert!(matches!(res, Err(crate::journal::Error::ItemPruned(_))));
1194 }
1195 {
1196 let reader = journal.reader().await;
1197 let res = reader.replay(NZUsize!(20), 19).await;
1198 assert!(matches!(res, Err(crate::journal::Error::ItemPruned(_))));
1199 }
1200
1201 {
1203 let reader = journal.reader().await;
1204 let stream = reader.replay(NZUsize!(20), 20).await.unwrap();
1205 futures::pin_mut!(stream);
1206 for i in 20..40u64 {
1207 let (pos, item) = stream.next().await.unwrap().unwrap();
1208 assert_eq!(pos, i);
1209 assert_eq!(item, i * 100);
1210 }
1211 assert!(stream.next().await.is_none());
1212 }
1213
1214 {
1216 let reader = journal.reader().await;
1217 let stream = reader.replay(NZUsize!(20), 40).await.unwrap();
1218 futures::pin_mut!(stream);
1219 assert!(stream.next().await.is_none());
1220 }
1221
1222 {
1224 let reader = journal.reader().await;
1225 let res = reader.replay(NZUsize!(20), 41).await;
1226 assert!(matches!(
1227 res,
1228 Err(crate::journal::Error::ItemOutOfRange(41))
1229 ));
1230 }
1231
1232 journal.destroy().await.unwrap();
1233 });
1234 }
1235
1236 #[test_traced]
1237 fn test_variable_contiguous() {
1238 let executor = deterministic::Runner::default();
1239 executor.start(|context| async move {
1240 run_contiguous_tests(move |test_name: String, idx: usize| {
1241 let label = test_name.replace('-', "_");
1242 let context = context.with_label(&format!("{label}_{idx}"));
1243 async move {
1244 let cfg = Config {
1245 partition: format!("generic-test-{test_name}"),
1246 items_per_section: NZU64!(10),
1247 compression: None,
1248 codec_config: (),
1249 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1250 write_buffer: NZUsize!(1024),
1251 };
1252 Journal::<_, u64>::init(context, cfg).await
1253 }
1254 .boxed()
1255 })
1256 .await;
1257 });
1258 }
1259
1260 #[test_traced]
1262 fn test_variable_multiple_sequential_prunes() {
1263 let executor = deterministic::Runner::default();
1264 executor.start(|context| async move {
1265 let cfg = Config {
1266 partition: "sequential-prunes".into(),
1267 items_per_section: NZU64!(10),
1268 compression: None,
1269 codec_config: (),
1270 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1271 write_buffer: NZUsize!(1024),
1272 };
1273
1274 let journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1275
1276 for i in 0..40u64 {
1278 journal.append(&(i * 100)).await.unwrap();
1279 }
1280
1281 let bounds = journal.bounds().await;
1283 assert_eq!(bounds.start, 0);
1284 assert_eq!(bounds.end, 40);
1285
1286 let pruned = journal.prune(10).await.unwrap();
1288 assert!(pruned);
1289
1290 assert_eq!(journal.bounds().await.start, 10);
1292
1293 assert!(matches!(
1295 journal.read(0).await,
1296 Err(crate::journal::Error::ItemPruned(_))
1297 ));
1298 assert_eq!(journal.read(10).await.unwrap(), 1000);
1299 assert_eq!(journal.read(19).await.unwrap(), 1900);
1300
1301 let pruned = journal.prune(20).await.unwrap();
1303 assert!(pruned);
1304
1305 assert_eq!(journal.bounds().await.start, 20);
1307
1308 assert!(matches!(
1310 journal.read(10).await,
1311 Err(crate::journal::Error::ItemPruned(_))
1312 ));
1313 assert!(matches!(
1314 journal.read(19).await,
1315 Err(crate::journal::Error::ItemPruned(_))
1316 ));
1317 assert_eq!(journal.read(20).await.unwrap(), 2000);
1318 assert_eq!(journal.read(29).await.unwrap(), 2900);
1319
1320 let pruned = journal.prune(30).await.unwrap();
1322 assert!(pruned);
1323
1324 assert_eq!(journal.bounds().await.start, 30);
1326
1327 assert!(matches!(
1329 journal.read(20).await,
1330 Err(crate::journal::Error::ItemPruned(_))
1331 ));
1332 assert!(matches!(
1333 journal.read(29).await,
1334 Err(crate::journal::Error::ItemPruned(_))
1335 ));
1336 assert_eq!(journal.read(30).await.unwrap(), 3000);
1337 assert_eq!(journal.read(39).await.unwrap(), 3900);
1338
1339 assert_eq!(journal.size().await, 40);
1341
1342 journal.destroy().await.unwrap();
1343 });
1344 }
1345
1346 #[test_traced]
1348 fn test_variable_prune_all_then_reinit() {
1349 let executor = deterministic::Runner::default();
1350 executor.start(|context| async move {
1351 let cfg = Config {
1352 partition: "prune-all-reinit".into(),
1353 items_per_section: NZU64!(10),
1354 compression: None,
1355 codec_config: (),
1356 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1357 write_buffer: NZUsize!(1024),
1358 };
1359
1360 let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1362 .await
1363 .unwrap();
1364
1365 for i in 0..100u64 {
1366 journal.append(&(i * 100)).await.unwrap();
1367 }
1368
1369 let bounds = journal.bounds().await;
1370 assert_eq!(bounds.end, 100);
1371 assert_eq!(bounds.start, 0);
1372
1373 let pruned = journal.prune(100).await.unwrap();
1375 assert!(pruned);
1376
1377 let bounds = journal.bounds().await;
1379 assert_eq!(bounds.end, 100);
1380 assert!(bounds.is_empty());
1381
1382 for i in 0..100 {
1384 assert!(matches!(
1385 journal.read(i).await,
1386 Err(crate::journal::Error::ItemPruned(_))
1387 ));
1388 }
1389
1390 journal.sync().await.unwrap();
1391 drop(journal);
1392
1393 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1395 .await
1396 .unwrap();
1397
1398 let bounds = journal.bounds().await;
1400 assert_eq!(bounds.end, 100);
1401 assert!(bounds.is_empty());
1402
1403 for i in 0..100 {
1405 assert!(matches!(
1406 journal.read(i).await,
1407 Err(crate::journal::Error::ItemPruned(_))
1408 ));
1409 }
1410
1411 journal.append(&10000).await.unwrap();
1414 let bounds = journal.bounds().await;
1415 assert_eq!(bounds.end, 101);
1416 assert_eq!(bounds.start, 100);
1418
1419 assert_eq!(journal.read(100).await.unwrap(), 10000);
1421
1422 assert!(matches!(
1424 journal.read(99).await,
1425 Err(crate::journal::Error::ItemPruned(_))
1426 ));
1427
1428 journal.destroy().await.unwrap();
1429 });
1430 }
1431
1432 #[test_traced]
1434 fn test_variable_recovery_prune_crash_offsets_behind() {
1435 let executor = deterministic::Runner::default();
1436 executor.start(|context| async move {
1437 let cfg = Config {
1439 partition: "recovery-prune-crash".into(),
1440 items_per_section: NZU64!(10),
1441 compression: None,
1442 codec_config: (),
1443 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1444 write_buffer: NZUsize!(1024),
1445 };
1446
1447 let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1448 .await
1449 .unwrap();
1450
1451 for i in 0..40u64 {
1453 variable.append(&(i * 100)).await.unwrap();
1454 }
1455
1456 variable.prune(10).await.unwrap();
1458 assert_eq!(variable.bounds().await.start, 10);
1459
1460 variable.test_prune_data(2).await.unwrap();
1463 variable.sync().await.unwrap();
1466 drop(variable);
1467
1468 let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1470 .await
1471 .unwrap();
1472
1473 let bounds = variable.bounds().await;
1475 assert_eq!(bounds.start, 20);
1476 assert_eq!(bounds.end, 40);
1477
1478 assert!(matches!(
1480 variable.read(10).await,
1481 Err(crate::journal::Error::ItemPruned(_))
1482 ));
1483
1484 assert_eq!(variable.read(20).await.unwrap(), 2000);
1486 assert_eq!(variable.read(39).await.unwrap(), 3900);
1487
1488 variable.destroy().await.unwrap();
1489 });
1490 }
1491
1492 #[test_traced]
1497 fn test_variable_recovery_offsets_ahead_corruption() {
1498 let executor = deterministic::Runner::default();
1499 executor.start(|context| async move {
1500 let cfg = Config {
1502 partition: "recovery-offsets-ahead".into(),
1503 items_per_section: NZU64!(10),
1504 compression: None,
1505 codec_config: (),
1506 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1507 write_buffer: NZUsize!(1024),
1508 };
1509
1510 let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1511 .await
1512 .unwrap();
1513
1514 for i in 0..40u64 {
1516 variable.append(&(i * 100)).await.unwrap();
1517 }
1518
1519 variable.test_prune_offsets(20).await.unwrap(); variable.test_prune_data(1).await.unwrap(); variable.sync().await.unwrap();
1524 drop(variable);
1525
1526 let result = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()).await;
1528 assert!(matches!(result, Err(Error::Corruption(_))));
1529 });
1530 }
1531
1532 #[test_traced]
1534 fn test_variable_recovery_append_crash_offsets_behind() {
1535 let executor = deterministic::Runner::default();
1536 executor.start(|context| async move {
1537 let cfg = Config {
1539 partition: "recovery-append-crash".into(),
1540 items_per_section: NZU64!(10),
1541 compression: None,
1542 codec_config: (),
1543 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1544 write_buffer: NZUsize!(1024),
1545 };
1546
1547 let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1548 .await
1549 .unwrap();
1550
1551 for i in 0..15u64 {
1553 variable.append(&(i * 100)).await.unwrap();
1554 }
1555
1556 assert_eq!(variable.size().await, 15);
1557
1558 for i in 15..20u64 {
1560 variable.test_append_data(1, i * 100).await.unwrap();
1561 }
1562 variable.sync().await.unwrap();
1565 drop(variable);
1566
1567 let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1569 .await
1570 .unwrap();
1571
1572 let bounds = variable.bounds().await;
1574 assert_eq!(bounds.end, 20);
1575 assert_eq!(bounds.start, 0);
1576
1577 for i in 0..20u64 {
1579 assert_eq!(variable.read(i).await.unwrap(), i * 100);
1580 }
1581
1582 assert_eq!(variable.test_offsets_size().await, 20);
1584
1585 variable.destroy().await.unwrap();
1586 });
1587 }
1588
1589 #[test_traced]
1591 fn test_variable_recovery_multiple_prunes_crash() {
1592 let executor = deterministic::Runner::default();
1593 executor.start(|context| async move {
1594 let cfg = Config {
1596 partition: "recovery-multiple-prunes".into(),
1597 items_per_section: NZU64!(10),
1598 compression: None,
1599 codec_config: (),
1600 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1601 write_buffer: NZUsize!(1024),
1602 };
1603
1604 let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1605 .await
1606 .unwrap();
1607
1608 for i in 0..50u64 {
1610 variable.append(&(i * 100)).await.unwrap();
1611 }
1612
1613 variable.prune(10).await.unwrap();
1615 assert_eq!(variable.bounds().await.start, 10);
1616
1617 variable.test_prune_data(3).await.unwrap();
1620 variable.sync().await.unwrap();
1623 drop(variable);
1624
1625 let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1627 .await
1628 .unwrap();
1629
1630 let bounds = variable.bounds().await;
1632 assert_eq!(bounds.start, 30);
1633 assert_eq!(bounds.end, 50);
1634
1635 assert!(matches!(
1637 variable.read(10).await,
1638 Err(crate::journal::Error::ItemPruned(_))
1639 ));
1640 assert!(matches!(
1641 variable.read(20).await,
1642 Err(crate::journal::Error::ItemPruned(_))
1643 ));
1644
1645 assert_eq!(variable.read(30).await.unwrap(), 3000);
1647 assert_eq!(variable.read(49).await.unwrap(), 4900);
1648
1649 variable.destroy().await.unwrap();
1650 });
1651 }
1652
1653 #[test_traced]
1660 fn test_variable_recovery_rewind_crash_multi_section() {
1661 let executor = deterministic::Runner::default();
1662 executor.start(|context| async move {
1663 let cfg = Config {
1665 partition: "recovery-rewind-crash".into(),
1666 items_per_section: NZU64!(10),
1667 compression: None,
1668 codec_config: (),
1669 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1670 write_buffer: NZUsize!(1024),
1671 };
1672
1673 let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1674 .await
1675 .unwrap();
1676
1677 for i in 0..25u64 {
1679 variable.append(&(i * 100)).await.unwrap();
1680 }
1681
1682 assert_eq!(variable.size().await, 25);
1683
1684 variable.test_rewind_offsets(5).await.unwrap();
1687 variable.sync().await.unwrap();
1690 drop(variable);
1691
1692 let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1694 .await
1695 .unwrap();
1696
1697 let bounds = variable.bounds().await;
1699 assert_eq!(bounds.end, 25);
1700 assert_eq!(bounds.start, 0);
1701
1702 for i in 0..25u64 {
1704 assert_eq!(variable.read(i).await.unwrap(), i * 100);
1705 }
1706
1707 assert_eq!(variable.test_offsets_size().await, 25);
1709
1710 let pos = variable.append(&2500).await.unwrap();
1712 assert_eq!(pos, 25);
1713 assert_eq!(variable.read(25).await.unwrap(), 2500);
1714
1715 variable.destroy().await.unwrap();
1716 });
1717 }
1718
1719 #[test_traced]
1722 fn test_variable_recovery_empty_offsets_after_prune_and_append() {
1723 let executor = deterministic::Runner::default();
1724 executor.start(|context| async move {
1725 let cfg = Config {
1726 partition: "recovery-empty-after-prune".into(),
1727 items_per_section: NZU64!(10),
1728 compression: None,
1729 codec_config: (),
1730 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1731 write_buffer: NZUsize!(1024),
1732 };
1733
1734 let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1736 .await
1737 .unwrap();
1738
1739 for i in 0..10u64 {
1741 journal.append(&(i * 100)).await.unwrap();
1742 }
1743 let bounds = journal.bounds().await;
1744 assert_eq!(bounds.end, 10);
1745 assert_eq!(bounds.start, 0);
1746
1747 journal.prune(10).await.unwrap();
1749 let bounds = journal.bounds().await;
1750 assert_eq!(bounds.end, 10);
1751 assert!(bounds.is_empty()); for i in 10..20u64 {
1757 journal.test_append_data(1, i * 100).await.unwrap();
1758 }
1759 journal.test_sync_data().await.unwrap();
1761 drop(journal);
1765
1766 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1768 .await
1769 .expect("Should recover from crash after data sync but before offsets sync");
1770
1771 let bounds = journal.bounds().await;
1773 assert_eq!(bounds.end, 20);
1774 assert_eq!(bounds.start, 10);
1775
1776 for i in 10..20u64 {
1778 assert_eq!(journal.read(i).await.unwrap(), i * 100);
1779 }
1780
1781 for i in 0..10 {
1783 assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1784 }
1785
1786 journal.destroy().await.unwrap();
1787 });
1788 }
1789
1790 #[test_traced]
1792 fn test_variable_concurrent_sync_recovery() {
1793 let executor = deterministic::Runner::default();
1794 executor.start(|context| async move {
1795 let cfg = Config {
1796 partition: "concurrent-sync-recovery".into(),
1797 items_per_section: NZU64!(10),
1798 compression: None,
1799 codec_config: (),
1800 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1801 write_buffer: NZUsize!(1024),
1802 };
1803
1804 let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1805 .await
1806 .unwrap();
1807
1808 for i in 0..15u64 {
1810 journal.append(&(i * 100)).await.unwrap();
1811 }
1812
1813 journal.commit().await.unwrap();
1815
1816 drop(journal);
1818
1819 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1820 .await
1821 .unwrap();
1822
1823 assert_eq!(journal.size().await, 15);
1825 for i in 0..15u64 {
1826 assert_eq!(journal.read(i).await.unwrap(), i * 100);
1827 }
1828
1829 journal.destroy().await.unwrap();
1830 });
1831 }
1832
1833 #[test_traced]
1834 fn test_init_at_size_zero() {
1835 let executor = deterministic::Runner::default();
1836 executor.start(|context| async move {
1837 let cfg = Config {
1838 partition: "init-at-size-zero".into(),
1839 items_per_section: NZU64!(5),
1840 compression: None,
1841 codec_config: (),
1842 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1843 write_buffer: NZUsize!(1024),
1844 };
1845
1846 let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 0)
1847 .await
1848 .unwrap();
1849
1850 assert_eq!(journal.size().await, 0);
1852
1853 assert!(journal.bounds().await.is_empty());
1855
1856 let pos = journal.append(&100).await.unwrap();
1858 assert_eq!(pos, 0);
1859 assert_eq!(journal.size().await, 1);
1860 assert_eq!(journal.read(0).await.unwrap(), 100);
1861
1862 journal.destroy().await.unwrap();
1863 });
1864 }
1865
1866 #[test_traced]
1867 fn test_init_at_size_section_boundary() {
1868 let executor = deterministic::Runner::default();
1869 executor.start(|context| async move {
1870 let cfg = Config {
1871 partition: "init-at-size-boundary".into(),
1872 items_per_section: NZU64!(5),
1873 compression: None,
1874 codec_config: (),
1875 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1876 write_buffer: NZUsize!(1024),
1877 };
1878
1879 let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 10)
1881 .await
1882 .unwrap();
1883
1884 let bounds = journal.bounds().await;
1886 assert_eq!(bounds.end, 10);
1887
1888 assert!(bounds.is_empty());
1890
1891 let pos = journal.append(&1000).await.unwrap();
1893 assert_eq!(pos, 10);
1894 assert_eq!(journal.size().await, 11);
1895 assert_eq!(journal.read(10).await.unwrap(), 1000);
1896
1897 let pos = journal.append(&1001).await.unwrap();
1899 assert_eq!(pos, 11);
1900 assert_eq!(journal.read(11).await.unwrap(), 1001);
1901
1902 journal.destroy().await.unwrap();
1903 });
1904 }
1905
1906 #[test_traced]
1907 fn test_init_at_size_mid_section() {
1908 let executor = deterministic::Runner::default();
1909 executor.start(|context| async move {
1910 let cfg = Config {
1911 partition: "init-at-size-mid".into(),
1912 items_per_section: NZU64!(5),
1913 compression: None,
1914 codec_config: (),
1915 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1916 write_buffer: NZUsize!(1024),
1917 };
1918
1919 let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 7)
1921 .await
1922 .unwrap();
1923
1924 let bounds = journal.bounds().await;
1926 assert_eq!(bounds.end, 7);
1927
1928 assert!(bounds.is_empty());
1930
1931 let pos = journal.append(&700).await.unwrap();
1933 assert_eq!(pos, 7);
1934 assert_eq!(journal.size().await, 8);
1935 assert_eq!(journal.read(7).await.unwrap(), 700);
1936
1937 journal.destroy().await.unwrap();
1938 });
1939 }
1940
1941 #[test_traced]
1942 fn test_init_at_size_persistence() {
1943 let executor = deterministic::Runner::default();
1944 executor.start(|context| async move {
1945 let cfg = Config {
1946 partition: "init-at-size-persist".into(),
1947 items_per_section: NZU64!(5),
1948 compression: None,
1949 codec_config: (),
1950 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1951 write_buffer: NZUsize!(1024),
1952 };
1953
1954 let journal =
1956 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 15)
1957 .await
1958 .unwrap();
1959
1960 for i in 0..5u64 {
1962 let pos = journal.append(&(1500 + i)).await.unwrap();
1963 assert_eq!(pos, 15 + i);
1964 }
1965
1966 assert_eq!(journal.size().await, 20);
1967
1968 journal.sync().await.unwrap();
1970 drop(journal);
1971
1972 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1973 .await
1974 .unwrap();
1975
1976 let bounds = journal.bounds().await;
1978 assert_eq!(bounds.end, 20);
1979 assert_eq!(bounds.start, 15);
1980
1981 for i in 0..5u64 {
1983 assert_eq!(journal.read(15 + i).await.unwrap(), 1500 + i);
1984 }
1985
1986 let pos = journal.append(&9999).await.unwrap();
1988 assert_eq!(pos, 20);
1989 assert_eq!(journal.read(20).await.unwrap(), 9999);
1990
1991 journal.destroy().await.unwrap();
1992 });
1993 }
1994
1995 #[test_traced]
1996 fn test_init_at_size_persistence_without_data() {
1997 let executor = deterministic::Runner::default();
1998 executor.start(|context| async move {
1999 let cfg = Config {
2000 partition: "init-at-size-persist-empty".into(),
2001 items_per_section: NZU64!(5),
2002 compression: None,
2003 codec_config: (),
2004 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2005 write_buffer: NZUsize!(1024),
2006 };
2007
2008 let journal =
2010 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2011 .await
2012 .unwrap();
2013
2014 let bounds = journal.bounds().await;
2015 assert_eq!(bounds.end, 15);
2016 assert!(bounds.is_empty());
2017
2018 drop(journal);
2020
2021 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2023 .await
2024 .unwrap();
2025
2026 let bounds = journal.bounds().await;
2027 assert_eq!(bounds.end, 15);
2028 assert!(bounds.is_empty());
2029
2030 let pos = journal.append(&1500).await.unwrap();
2032 assert_eq!(pos, 15);
2033 assert_eq!(journal.read(15).await.unwrap(), 1500);
2034
2035 journal.destroy().await.unwrap();
2036 });
2037 }
2038
2039 #[test_traced]
2041 fn test_init_at_size_mid_section_persistence() {
2042 let executor = deterministic::Runner::default();
2043 executor.start(|context| async move {
2044 let cfg = Config {
2045 partition: "init-at-size-mid-section".into(),
2046 items_per_section: NZU64!(5),
2047 compression: None,
2048 codec_config: (),
2049 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2050 write_buffer: NZUsize!(1024),
2051 };
2052
2053 let journal =
2055 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2056 .await
2057 .unwrap();
2058
2059 for i in 0..3u64 {
2061 let pos = journal.append(&(700 + i)).await.unwrap();
2062 assert_eq!(pos, 7 + i);
2063 }
2064
2065 let bounds = journal.bounds().await;
2066 assert_eq!(bounds.end, 10);
2067 assert_eq!(bounds.start, 7);
2068
2069 journal.sync().await.unwrap();
2071 drop(journal);
2072
2073 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2075 .await
2076 .unwrap();
2077
2078 let bounds = journal.bounds().await;
2080 assert_eq!(bounds.end, 10);
2081 assert_eq!(bounds.start, 7);
2082
2083 for i in 0..3u64 {
2085 assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2086 }
2087
2088 assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2090
2091 journal.destroy().await.unwrap();
2092 });
2093 }
2094
2095 #[test_traced]
2097 fn test_init_at_size_mid_section_multi_section_persistence() {
2098 let executor = deterministic::Runner::default();
2099 executor.start(|context| async move {
2100 let cfg = Config {
2101 partition: "init-at-size-multi-section".into(),
2102 items_per_section: NZU64!(5),
2103 compression: None,
2104 codec_config: (),
2105 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2106 write_buffer: NZUsize!(1024),
2107 };
2108
2109 let journal =
2111 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2112 .await
2113 .unwrap();
2114
2115 for i in 0..8u64 {
2117 let pos = journal.append(&(700 + i)).await.unwrap();
2118 assert_eq!(pos, 7 + i);
2119 }
2120
2121 let bounds = journal.bounds().await;
2122 assert_eq!(bounds.end, 15);
2123 assert_eq!(bounds.start, 7);
2124
2125 journal.sync().await.unwrap();
2127 drop(journal);
2128
2129 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2131 .await
2132 .unwrap();
2133
2134 let bounds = journal.bounds().await;
2136 assert_eq!(bounds.end, 15);
2137 assert_eq!(bounds.start, 7);
2138
2139 for i in 0..8u64 {
2141 assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2142 }
2143
2144 journal.destroy().await.unwrap();
2145 });
2146 }
2147
2148 #[test_traced]
2150 fn test_align_journals_data_empty_mid_section_pruning_boundary() {
2151 let executor = deterministic::Runner::default();
2152 executor.start(|context| async move {
2153 let cfg = Config {
2154 partition: "align-journals-mid-section-pruning-boundary".into(),
2155 items_per_section: NZU64!(5),
2156 compression: None,
2157 codec_config: (),
2158 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2159 write_buffer: NZUsize!(1024),
2160 };
2161
2162 let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
2164 .await
2165 .unwrap();
2166 for i in 0..7u64 {
2167 journal.append(&(100 + i)).await.unwrap();
2168 }
2169 journal.sync().await.unwrap();
2170
2171 journal.inner.write().await.data.clear().await.unwrap();
2173 drop(journal);
2174
2175 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2177 .await
2178 .unwrap();
2179 let bounds = journal.bounds().await;
2180 assert_eq!(bounds.end, 7);
2181 assert!(bounds.is_empty());
2182
2183 let pos = journal.append(&777).await.unwrap();
2185 assert_eq!(pos, 7);
2186 assert_eq!(journal.size().await, 8);
2187 assert_eq!(journal.read(7).await.unwrap(), 777);
2188
2189 let section = 7 / cfg.items_per_section.get();
2191 journal
2192 .inner
2193 .write()
2194 .await
2195 .data
2196 .sync(section)
2197 .await
2198 .unwrap();
2199 drop(journal);
2200
2201 let journal = Journal::<_, u64>::init(context.with_label("third"), cfg.clone())
2203 .await
2204 .unwrap();
2205 let bounds = journal.bounds().await;
2206 assert_eq!(bounds.end, 8);
2207 assert_eq!(bounds.start, 7);
2208 assert_eq!(journal.read(7).await.unwrap(), 777);
2209
2210 journal.destroy().await.unwrap();
2211 });
2212 }
2213
2214 #[test_traced]
2216 fn test_init_at_size_crash_data_synced_offsets_not() {
2217 let executor = deterministic::Runner::default();
2218 executor.start(|context| async move {
2219 let cfg = Config {
2220 partition: "init-at-size-crash-recovery".into(),
2221 items_per_section: NZU64!(5),
2222 compression: None,
2223 codec_config: (),
2224 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2225 write_buffer: NZUsize!(1024),
2226 };
2227
2228 let journal =
2230 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2231 .await
2232 .unwrap();
2233
2234 for i in 0..3u64 {
2236 journal.append(&(700 + i)).await.unwrap();
2237 }
2238
2239 journal.inner.write().await.data.sync(1).await.unwrap();
2241 drop(journal);
2243
2244 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2246 .await
2247 .unwrap();
2248
2249 let bounds = journal.bounds().await;
2251 assert_eq!(bounds.end, 10);
2252 assert_eq!(bounds.start, 7);
2253
2254 for i in 0..3u64 {
2256 assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2257 }
2258
2259 journal.destroy().await.unwrap();
2260 });
2261 }
2262
2263 #[test_traced]
2264 fn test_prune_does_not_move_oldest_retained_backwards() {
2265 let executor = deterministic::Runner::default();
2266 executor.start(|context| async move {
2267 let cfg = Config {
2268 partition: "prune-no-backwards".into(),
2269 items_per_section: NZU64!(5),
2270 compression: None,
2271 codec_config: (),
2272 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2273 write_buffer: NZUsize!(1024),
2274 };
2275
2276 let journal =
2277 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2278 .await
2279 .unwrap();
2280
2281 for i in 0..3u64 {
2283 let pos = journal.append(&(700 + i)).await.unwrap();
2284 assert_eq!(pos, 7 + i);
2285 }
2286 assert_eq!(journal.bounds().await.start, 7);
2287
2288 journal.prune(8).await.unwrap();
2290 assert_eq!(journal.bounds().await.start, 7);
2291 assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2292 assert_eq!(journal.read(7).await.unwrap(), 700);
2293
2294 journal.destroy().await.unwrap();
2295 });
2296 }
2297
2298 #[test_traced]
2299 fn test_init_at_size_large_offset() {
2300 let executor = deterministic::Runner::default();
2301 executor.start(|context| async move {
2302 let cfg = Config {
2303 partition: "init-at-size-large".into(),
2304 items_per_section: NZU64!(5),
2305 compression: None,
2306 codec_config: (),
2307 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2308 write_buffer: NZUsize!(1024),
2309 };
2310
2311 let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 1000)
2313 .await
2314 .unwrap();
2315
2316 let bounds = journal.bounds().await;
2317 assert_eq!(bounds.end, 1000);
2318 assert!(bounds.is_empty());
2320
2321 let pos = journal.append(&100000).await.unwrap();
2323 assert_eq!(pos, 1000);
2324 assert_eq!(journal.read(1000).await.unwrap(), 100000);
2325
2326 journal.destroy().await.unwrap();
2327 });
2328 }
2329
2330 #[test_traced]
2331 fn test_init_at_size_prune_and_append() {
2332 let executor = deterministic::Runner::default();
2333 executor.start(|context| async move {
2334 let cfg = Config {
2335 partition: "init-at-size-prune".into(),
2336 items_per_section: NZU64!(5),
2337 compression: None,
2338 codec_config: (),
2339 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2340 write_buffer: NZUsize!(1024),
2341 };
2342
2343 let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 20)
2345 .await
2346 .unwrap();
2347
2348 for i in 0..10u64 {
2350 journal.append(&(2000 + i)).await.unwrap();
2351 }
2352
2353 assert_eq!(journal.size().await, 30);
2354
2355 journal.prune(25).await.unwrap();
2357
2358 let bounds = journal.bounds().await;
2359 assert_eq!(bounds.end, 30);
2360 assert_eq!(bounds.start, 25);
2361
2362 for i in 25..30u64 {
2364 assert_eq!(journal.read(i).await.unwrap(), 2000 + (i - 20));
2365 }
2366
2367 let pos = journal.append(&3000).await.unwrap();
2369 assert_eq!(pos, 30);
2370
2371 journal.destroy().await.unwrap();
2372 });
2373 }
2374
2375 #[test_traced]
2377 fn test_init_sync_no_existing_data() {
2378 let executor = deterministic::Runner::default();
2379 executor.start(|context| async move {
2380 let cfg = Config {
2381 partition: "test-fresh-start".into(),
2382 items_per_section: NZU64!(5),
2383 compression: None,
2384 codec_config: (),
2385 write_buffer: NZUsize!(1024),
2386 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2387 };
2388
2389 let lower_bound = 10;
2391 let upper_bound = 26;
2392 let journal =
2393 Journal::init_sync(context.clone(), cfg.clone(), lower_bound..upper_bound)
2394 .await
2395 .expect("Failed to initialize journal with sync boundaries");
2396
2397 let bounds = journal.bounds().await;
2398 assert_eq!(bounds.end, lower_bound);
2399 assert!(bounds.is_empty());
2400
2401 let pos1 = journal.append(&42u64).await.unwrap();
2403 assert_eq!(pos1, lower_bound);
2404 assert_eq!(journal.read(pos1).await.unwrap(), 42u64);
2405
2406 let pos2 = journal.append(&43u64).await.unwrap();
2407 assert_eq!(pos2, lower_bound + 1);
2408 assert_eq!(journal.read(pos2).await.unwrap(), 43u64);
2409
2410 journal.destroy().await.unwrap();
2411 });
2412 }
2413
2414 #[test_traced]
2416 fn test_init_sync_existing_data_overlap() {
2417 let executor = deterministic::Runner::default();
2418 executor.start(|context| async move {
2419 let cfg = Config {
2420 partition: "test-overlap".into(),
2421 items_per_section: NZU64!(5),
2422 compression: None,
2423 codec_config: (),
2424 write_buffer: NZUsize!(1024),
2425 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2426 };
2427
2428 let journal =
2430 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2431 .await
2432 .expect("Failed to create initial journal");
2433
2434 for i in 0..20u64 {
2436 journal.append(&(i * 100)).await.unwrap();
2437 }
2438 journal.sync().await.unwrap();
2439 drop(journal);
2440
2441 let lower_bound = 8;
2444 let upper_bound = 31;
2445 let journal = Journal::<deterministic::Context, u64>::init_sync(
2446 context.clone(),
2447 cfg.clone(),
2448 lower_bound..upper_bound,
2449 )
2450 .await
2451 .expect("Failed to initialize journal with overlap");
2452
2453 assert_eq!(journal.size().await, 20);
2454
2455 assert_eq!(journal.bounds().await.start, 5); assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2460 assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
2461
2462 assert_eq!(journal.read(5).await.unwrap(), 500);
2464 assert_eq!(journal.read(8).await.unwrap(), 800);
2465 assert_eq!(journal.read(19).await.unwrap(), 1900);
2466
2467 assert!(matches!(
2469 journal.read(20).await,
2470 Err(Error::ItemOutOfRange(_))
2471 ));
2472
2473 let pos = journal.append(&999).await.unwrap();
2475 assert_eq!(pos, 20);
2476 assert_eq!(journal.read(20).await.unwrap(), 999);
2477
2478 journal.destroy().await.unwrap();
2479 });
2480 }
2481
2482 #[should_panic]
2484 #[test_traced]
2485 fn test_init_sync_invalid_parameters() {
2486 let executor = deterministic::Runner::default();
2487 executor.start(|context| async move {
2488 let cfg = Config {
2489 partition: "test-invalid".into(),
2490 items_per_section: NZU64!(5),
2491 compression: None,
2492 codec_config: (),
2493 write_buffer: NZUsize!(1024),
2494 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2495 };
2496
2497 #[allow(clippy::reversed_empty_ranges)]
2498 let _result = Journal::<deterministic::Context, u64>::init_sync(
2499 context.clone(),
2500 cfg,
2501 10..5, )
2503 .await;
2504 });
2505 }
2506
2507 #[test_traced]
2509 fn test_init_sync_existing_data_exact_match() {
2510 let executor = deterministic::Runner::default();
2511 executor.start(|context| async move {
2512 let items_per_section = NZU64!(5);
2513 let cfg = Config {
2514 partition: "test-exact-match".into(),
2515 items_per_section,
2516 compression: None,
2517 codec_config: (),
2518 write_buffer: NZUsize!(1024),
2519 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2520 };
2521
2522 let journal =
2524 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2525 .await
2526 .expect("Failed to create initial journal");
2527
2528 for i in 0..20u64 {
2530 journal.append(&(i * 100)).await.unwrap();
2531 }
2532 journal.sync().await.unwrap();
2533 drop(journal);
2534
2535 let lower_bound = 5; let upper_bound = 20; let journal = Journal::<deterministic::Context, u64>::init_sync(
2539 context.clone(),
2540 cfg.clone(),
2541 lower_bound..upper_bound,
2542 )
2543 .await
2544 .expect("Failed to initialize journal with exact match");
2545
2546 assert_eq!(journal.size().await, 20);
2547
2548 assert_eq!(journal.bounds().await.start, 5); assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2553 assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
2554
2555 assert_eq!(journal.read(5).await.unwrap(), 500);
2557 assert_eq!(journal.read(10).await.unwrap(), 1000);
2558 assert_eq!(journal.read(19).await.unwrap(), 1900);
2559
2560 assert!(matches!(
2562 journal.read(20).await,
2563 Err(Error::ItemOutOfRange(_))
2564 ));
2565
2566 let pos = journal.append(&999).await.unwrap();
2568 assert_eq!(pos, 20);
2569 assert_eq!(journal.read(20).await.unwrap(), 999);
2570
2571 journal.destroy().await.unwrap();
2572 });
2573 }
2574
2575 #[test_traced]
2578 fn test_init_sync_existing_data_exceeds_upper_bound() {
2579 let executor = deterministic::Runner::default();
2580 executor.start(|context| async move {
2581 let items_per_section = NZU64!(5);
2582 let cfg = Config {
2583 partition: "test-unexpected-data".into(),
2584 items_per_section,
2585 compression: None,
2586 codec_config: (),
2587 write_buffer: NZUsize!(1024),
2588 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2589 };
2590
2591 let journal = Journal::<deterministic::Context, u64>::init(
2593 context.with_label("initial"),
2594 cfg.clone(),
2595 )
2596 .await
2597 .expect("Failed to create initial journal");
2598
2599 for i in 0..30u64 {
2601 journal.append(&(i * 1000)).await.unwrap();
2602 }
2603 journal.sync().await.unwrap();
2604 drop(journal);
2605
2606 let lower_bound = 8; for (i, upper_bound) in (9..29).enumerate() {
2609 let result = Journal::<deterministic::Context, u64>::init_sync(
2610 context.with_label(&format!("sync_{i}")),
2611 cfg.clone(),
2612 lower_bound..upper_bound,
2613 )
2614 .await;
2615
2616 assert!(matches!(result, Err(Error::ItemOutOfRange(_))));
2618 }
2619 });
2620 }
2621
2622 #[test_traced]
2624 fn test_init_sync_existing_data_stale() {
2625 let executor = deterministic::Runner::default();
2626 executor.start(|context| async move {
2627 let items_per_section = NZU64!(5);
2628 let cfg = Config {
2629 partition: "test-stale".into(),
2630 items_per_section,
2631 compression: None,
2632 codec_config: (),
2633 write_buffer: NZUsize!(1024),
2634 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2635 };
2636
2637 let journal = Journal::<deterministic::Context, u64>::init(
2639 context.with_label("first"),
2640 cfg.clone(),
2641 )
2642 .await
2643 .expect("Failed to create initial journal");
2644
2645 for i in 0..10u64 {
2647 journal.append(&(i * 100)).await.unwrap();
2648 }
2649 journal.sync().await.unwrap();
2650 drop(journal);
2651
2652 let lower_bound = 15; let upper_bound = 26; let journal = Journal::<deterministic::Context, u64>::init_sync(
2656 context.with_label("second"),
2657 cfg.clone(),
2658 lower_bound..upper_bound,
2659 )
2660 .await
2661 .expect("Failed to initialize journal with stale data");
2662
2663 assert_eq!(journal.size().await, 15);
2664
2665 assert!(journal.bounds().await.is_empty());
2667
2668 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2670 assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2671 assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2672
2673 journal.destroy().await.unwrap();
2674 });
2675 }
2676
2677 #[test_traced]
2679 fn test_init_sync_section_boundaries() {
2680 let executor = deterministic::Runner::default();
2681 executor.start(|context| async move {
2682 let items_per_section = NZU64!(5);
2683 let cfg = Config {
2684 partition: "test-boundaries".into(),
2685 items_per_section,
2686 compression: None,
2687 codec_config: (),
2688 write_buffer: NZUsize!(1024),
2689 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2690 };
2691
2692 let journal =
2694 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2695 .await
2696 .expect("Failed to create initial journal");
2697
2698 for i in 0..25u64 {
2700 journal.append(&(i * 100)).await.unwrap();
2701 }
2702 journal.sync().await.unwrap();
2703 drop(journal);
2704
2705 let lower_bound = 15; let upper_bound = 25; let journal = Journal::<deterministic::Context, u64>::init_sync(
2709 context.clone(),
2710 cfg.clone(),
2711 lower_bound..upper_bound,
2712 )
2713 .await
2714 .expect("Failed to initialize journal at boundaries");
2715
2716 assert_eq!(journal.size().await, 25);
2717
2718 assert_eq!(journal.bounds().await.start, 15);
2720
2721 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2723 assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2724
2725 assert_eq!(journal.read(15).await.unwrap(), 1500);
2727 assert_eq!(journal.read(20).await.unwrap(), 2000);
2728 assert_eq!(journal.read(24).await.unwrap(), 2400);
2729
2730 assert!(matches!(
2732 journal.read(25).await,
2733 Err(Error::ItemOutOfRange(_))
2734 ));
2735
2736 let pos = journal.append(&999).await.unwrap();
2738 assert_eq!(pos, 25);
2739 assert_eq!(journal.read(25).await.unwrap(), 999);
2740
2741 journal.destroy().await.unwrap();
2742 });
2743 }
2744
2745 #[test_traced]
2747 fn test_init_sync_same_section_bounds() {
2748 let executor = deterministic::Runner::default();
2749 executor.start(|context| async move {
2750 let items_per_section = NZU64!(5);
2751 let cfg = Config {
2752 partition: "test-same-section".into(),
2753 items_per_section,
2754 compression: None,
2755 codec_config: (),
2756 write_buffer: NZUsize!(1024),
2757 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2758 };
2759
2760 let journal =
2762 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2763 .await
2764 .expect("Failed to create initial journal");
2765
2766 for i in 0..15u64 {
2768 journal.append(&(i * 100)).await.unwrap();
2769 }
2770 journal.sync().await.unwrap();
2771 drop(journal);
2772
2773 let lower_bound = 10; let upper_bound = 15; let journal = Journal::<deterministic::Context, u64>::init_sync(
2777 context.clone(),
2778 cfg.clone(),
2779 lower_bound..upper_bound,
2780 )
2781 .await
2782 .expect("Failed to initialize journal with same-section bounds");
2783
2784 assert_eq!(journal.size().await, 15);
2785
2786 assert_eq!(journal.bounds().await.start, 10);
2789
2790 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2792 assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2793
2794 assert_eq!(journal.read(10).await.unwrap(), 1000);
2796 assert_eq!(journal.read(11).await.unwrap(), 1100);
2797 assert_eq!(journal.read(14).await.unwrap(), 1400);
2798
2799 assert!(matches!(
2801 journal.read(15).await,
2802 Err(Error::ItemOutOfRange(_))
2803 ));
2804
2805 let pos = journal.append(&999).await.unwrap();
2807 assert_eq!(pos, 15);
2808 assert_eq!(journal.read(15).await.unwrap(), 999);
2809
2810 journal.destroy().await.unwrap();
2811 });
2812 }
2813
2814 #[test_traced]
2819 fn test_single_item_per_section() {
2820 let executor = deterministic::Runner::default();
2821 executor.start(|context| async move {
2822 let cfg = Config {
2823 partition: "single-item-per-section".into(),
2824 items_per_section: NZU64!(1),
2825 compression: None,
2826 codec_config: (),
2827 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2828 write_buffer: NZUsize!(1024),
2829 };
2830
2831 let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
2833 .await
2834 .unwrap();
2835
2836 let bounds = journal.bounds().await;
2838 assert_eq!(bounds.end, 0);
2839 assert!(bounds.is_empty());
2840
2841 let pos = journal.append(&0).await.unwrap();
2843 assert_eq!(pos, 0);
2844 assert_eq!(journal.size().await, 1);
2845
2846 journal.sync().await.unwrap();
2848
2849 let value = journal.read(journal.size().await - 1).await.unwrap();
2851 assert_eq!(value, 0);
2852
2853 for i in 1..10u64 {
2855 let pos = journal.append(&(i * 100)).await.unwrap();
2856 assert_eq!(pos, i);
2857 assert_eq!(journal.size().await, i + 1);
2858
2859 let value = journal.read(journal.size().await - 1).await.unwrap();
2861 assert_eq!(value, i * 100);
2862 }
2863
2864 for i in 0..10u64 {
2866 assert_eq!(journal.read(i).await.unwrap(), i * 100);
2867 }
2868
2869 journal.sync().await.unwrap();
2870
2871 let pruned = journal.prune(5).await.unwrap();
2874 assert!(pruned);
2875
2876 assert_eq!(journal.size().await, 10);
2878
2879 assert_eq!(journal.bounds().await.start, 5);
2881
2882 let value = journal.read(journal.size().await - 1).await.unwrap();
2884 assert_eq!(value, 900);
2885
2886 for i in 0..5 {
2888 assert!(matches!(
2889 journal.read(i).await,
2890 Err(crate::journal::Error::ItemPruned(_))
2891 ));
2892 }
2893
2894 for i in 5..10u64 {
2896 assert_eq!(journal.read(i).await.unwrap(), i * 100);
2897 }
2898
2899 for i in 10..15u64 {
2901 let pos = journal.append(&(i * 100)).await.unwrap();
2902 assert_eq!(pos, i);
2903
2904 let value = journal.read(journal.size().await - 1).await.unwrap();
2906 assert_eq!(value, i * 100);
2907 }
2908
2909 journal.sync().await.unwrap();
2910 drop(journal);
2911
2912 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2914 .await
2915 .unwrap();
2916
2917 assert_eq!(journal.size().await, 15);
2919
2920 assert_eq!(journal.bounds().await.start, 5);
2922
2923 let value = journal.read(journal.size().await - 1).await.unwrap();
2925 assert_eq!(value, 1400);
2926
2927 for i in 5..15u64 {
2929 assert_eq!(journal.read(i).await.unwrap(), i * 100);
2930 }
2931
2932 journal.destroy().await.unwrap();
2933
2934 let journal = Journal::<_, u64>::init(context.with_label("third"), cfg.clone())
2937 .await
2938 .unwrap();
2939
2940 for i in 0..10u64 {
2942 journal.append(&(i * 1000)).await.unwrap();
2943 }
2944
2945 journal.prune(5).await.unwrap();
2947 let bounds = journal.bounds().await;
2948 assert_eq!(bounds.end, 10);
2949 assert_eq!(bounds.start, 5);
2950
2951 journal.sync().await.unwrap();
2953 drop(journal);
2954
2955 let journal = Journal::<_, u64>::init(context.with_label("fourth"), cfg.clone())
2957 .await
2958 .unwrap();
2959
2960 let bounds = journal.bounds().await;
2962 assert_eq!(bounds.end, 10);
2963 assert_eq!(bounds.start, 5);
2964
2965 let value = journal.read(journal.size().await - 1).await.unwrap();
2967 assert_eq!(value, 9000);
2968
2969 for i in 5..10u64 {
2971 assert_eq!(journal.read(i).await.unwrap(), i * 1000);
2972 }
2973
2974 journal.destroy().await.unwrap();
2975
2976 let journal = Journal::<_, u64>::init(context.with_label("fifth"), cfg.clone())
2980 .await
2981 .unwrap();
2982
2983 for i in 0..5u64 {
2984 journal.append(&(i * 100)).await.unwrap();
2985 }
2986 journal.sync().await.unwrap();
2987
2988 journal.prune(5).await.unwrap();
2990 let bounds = journal.bounds().await;
2991 assert_eq!(bounds.end, 5); assert!(bounds.is_empty()); let result = journal.read(journal.size().await - 1).await;
2996 assert!(matches!(result, Err(crate::journal::Error::ItemPruned(4))));
2997
2998 journal.append(&500).await.unwrap();
3000 let bounds = journal.bounds().await;
3001 assert_eq!(bounds.start, 5);
3002 assert_eq!(journal.read(bounds.end - 1).await.unwrap(), 500);
3003
3004 journal.destroy().await.unwrap();
3005 });
3006 }
3007
3008 #[test_traced]
3009 fn test_variable_journal_clear_to_size() {
3010 let executor = deterministic::Runner::default();
3011 executor.start(|context| async move {
3012 let cfg = Config {
3013 partition: "clear-test".into(),
3014 items_per_section: NZU64!(10),
3015 compression: None,
3016 codec_config: (),
3017 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
3018 write_buffer: NZUsize!(1024),
3019 };
3020
3021 let journal = Journal::<_, u64>::init(context.with_label("journal"), cfg.clone())
3022 .await
3023 .unwrap();
3024
3025 for i in 0..25u64 {
3027 journal.append(&(i * 100)).await.unwrap();
3028 }
3029 let bounds = journal.bounds().await;
3030 assert_eq!(bounds.end, 25);
3031 assert_eq!(bounds.start, 0);
3032 journal.sync().await.unwrap();
3033
3034 journal.clear_to_size(100).await.unwrap();
3036 let bounds = journal.bounds().await;
3037 assert_eq!(bounds.end, 100);
3038 assert!(bounds.is_empty());
3039
3040 for i in 0..25 {
3042 assert!(matches!(
3043 journal.read(i).await,
3044 Err(crate::journal::Error::ItemPruned(_))
3045 ));
3046 }
3047
3048 drop(journal);
3050 let journal =
3051 Journal::<_, u64>::init(context.with_label("journal_after_clear"), cfg.clone())
3052 .await
3053 .unwrap();
3054 let bounds = journal.bounds().await;
3055 assert_eq!(bounds.end, 100);
3056 assert!(bounds.is_empty());
3057
3058 for i in 100..105u64 {
3060 let pos = journal.append(&(i * 100)).await.unwrap();
3061 assert_eq!(pos, i);
3062 }
3063 let bounds = journal.bounds().await;
3064 assert_eq!(bounds.end, 105);
3065 assert_eq!(bounds.start, 100);
3066
3067 for i in 100..105u64 {
3069 assert_eq!(journal.read(i).await.unwrap(), i * 100);
3070 }
3071
3072 journal.sync().await.unwrap();
3074 drop(journal);
3075
3076 let journal = Journal::<_, u64>::init(context.with_label("journal_reopened"), cfg)
3077 .await
3078 .unwrap();
3079
3080 let bounds = journal.bounds().await;
3081 assert_eq!(bounds.end, 105);
3082 assert_eq!(bounds.start, 100);
3083 for i in 100..105u64 {
3084 assert_eq!(journal.read(i).await.unwrap(), i * 100);
3085 }
3086
3087 journal.destroy().await.unwrap();
3088 });
3089 }
3090}