1use super::Reader as _;
7use crate::{
8 journal::{
9 contiguous::{fixed, Contiguous, Many, Mutable},
10 segmented::variable,
11 Error,
12 },
13 Context, Persistable,
14};
15use commonware_codec::{Codec, CodecShared};
16use commonware_runtime::buffer::paged::CacheRef;
17use commonware_utils::{
18 sync::{AsyncRwLockReadGuard, UpgradableAsyncRwLock},
19 NZUsize,
20};
21#[commonware_macros::stability(ALPHA)]
22use core::ops::Range;
23use futures::{stream, Stream, StreamExt as _};
24use std::num::{NonZeroU64, NonZeroUsize};
25#[commonware_macros::stability(ALPHA)]
26use tracing::debug;
27use tracing::warn;
28
29const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1024);
30
31const DATA_SUFFIX: &str = "_data";
33
34const OFFSETS_SUFFIX: &str = "_offsets";
36
37const fn position_to_section(position: u64, items_per_section: u64) -> u64 {
59 position / items_per_section
60}
61
62#[derive(Clone)]
64pub struct Config<C> {
65 pub partition: String,
67
68 pub items_per_section: NonZeroU64,
73
74 pub compression: Option<u8>,
76
77 pub codec_config: C,
79
80 pub page_cache: CacheRef,
82
83 pub write_buffer: NonZeroUsize,
85}
86
87impl<C> Config<C> {
88 fn data_partition(&self) -> String {
90 format!("{}{}", self.partition, DATA_SUFFIX)
91 }
92
93 fn offsets_partition(&self) -> String {
95 format!("{}{}", self.partition, OFFSETS_SUFFIX)
96 }
97}
98
99struct Inner<E: Context, V: Codec> {
101 data: variable::Journal<E, V>,
103
104 size: u64,
110
111 pruning_boundary: u64,
120}
121
122impl<E: Context, V: CodecShared> Inner<E, V> {
123 async fn read(
131 &self,
132 position: u64,
133 items_per_section: u64,
134 offsets: &impl super::Reader<Item = u64>,
135 ) -> Result<V, Error> {
136 if position >= self.size {
137 return Err(Error::ItemOutOfRange(position));
138 }
139 if position < self.pruning_boundary {
140 return Err(Error::ItemPruned(position));
141 }
142
143 let offset = offsets.read(position).await?;
144 let section = position_to_section(position, items_per_section);
145
146 self.data.get(section, offset).await
147 }
148
149 fn try_read_sync(
151 &self,
152 position: u64,
153 items_per_section: u64,
154 offsets: &impl super::Reader<Item = u64>,
155 ) -> Option<V> {
156 if position >= self.size || position < self.pruning_boundary {
157 return None;
158 }
159 let offset = offsets.try_read_sync(position)?;
160 let section = position_to_section(position, items_per_section);
161 self.data.try_get_sync(section, offset)
162 }
163}
164
165pub struct Journal<E: Context, V: Codec> {
203 inner: UpgradableAsyncRwLock<Inner<E, V>>,
208
209 offsets: fixed::Journal<E, u64>,
212
213 items_per_section: u64,
220
221 compression: Option<u8>,
223}
224
225pub struct Reader<'a, E: Context, V: Codec> {
227 guard: AsyncRwLockReadGuard<'a, Inner<E, V>>,
228 offsets: fixed::Reader<'a, E, u64>,
229 items_per_section: u64,
230}
231
232impl<E: Context, V: CodecShared> super::Reader for Reader<'_, E, V> {
233 type Item = V;
234
235 fn bounds(&self) -> std::ops::Range<u64> {
236 self.guard.pruning_boundary..self.guard.size
237 }
238
239 async fn read(&self, position: u64) -> Result<V, Error> {
240 self.guard
241 .read(position, self.items_per_section, &self.offsets)
242 .await
243 }
244
245 fn try_read_sync(&self, position: u64) -> Option<V> {
246 self.guard
247 .try_read_sync(position, self.items_per_section, &self.offsets)
248 }
249
250 async fn replay(
251 &self,
252 buffer_size: NonZeroUsize,
253 start_pos: u64,
254 ) -> Result<impl Stream<Item = Result<(u64, V), Error>> + Send, Error> {
255 if start_pos < self.guard.pruning_boundary {
257 return Err(Error::ItemPruned(start_pos));
258 }
259 if start_pos > self.guard.size {
260 return Err(Error::ItemOutOfRange(start_pos));
261 }
262
263 let (start_section, start_offset) = if start_pos < self.guard.size {
266 let offset = self.offsets.read(start_pos).await?;
267 let section = position_to_section(start_pos, self.items_per_section);
268 (section, offset)
269 } else {
270 (u64::MAX, 0)
271 };
272
273 let inner_stream = self
274 .guard
275 .data
276 .replay(start_section, start_offset, buffer_size)
277 .await?;
278
279 let stream = inner_stream
281 .zip(stream::iter(start_pos..))
282 .map(|(result, pos)| result.map(|(_section, _offset, _size, item)| (pos, item)));
283
284 Ok(stream)
285 }
286}
287
288impl<E: Context, V: CodecShared> Journal<E, V> {
289 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
296 let items_per_section = cfg.items_per_section.get();
297 let data_partition = cfg.data_partition();
298 let offsets_partition = cfg.offsets_partition();
299
300 let mut data = variable::Journal::init(
302 context.with_label("data"),
303 variable::Config {
304 partition: data_partition,
305 compression: cfg.compression,
306 codec_config: cfg.codec_config,
307 page_cache: cfg.page_cache.clone(),
308 write_buffer: cfg.write_buffer,
309 },
310 )
311 .await?;
312
313 let mut offsets = fixed::Journal::init(
315 context.with_label("offsets"),
316 fixed::Config {
317 partition: offsets_partition,
318 items_per_blob: cfg.items_per_section,
319 page_cache: cfg.page_cache,
320 write_buffer: cfg.write_buffer,
321 },
322 )
323 .await?;
324
325 let (pruning_boundary, size) =
327 Self::align_journals(&mut data, &mut offsets, items_per_section).await?;
328
329 Ok(Self {
330 inner: UpgradableAsyncRwLock::new(Inner {
331 data,
332 size,
333 pruning_boundary,
334 }),
335 offsets,
336 items_per_section,
337 compression: cfg.compression,
338 })
339 }
340
341 #[commonware_macros::stability(ALPHA)]
346 pub async fn init_at_size(context: E, cfg: Config<V::Cfg>, size: u64) -> Result<Self, Error> {
347 let data = variable::Journal::init(
349 context.with_label("data"),
350 variable::Config {
351 partition: cfg.data_partition(),
352 compression: cfg.compression,
353 codec_config: cfg.codec_config.clone(),
354 page_cache: cfg.page_cache.clone(),
355 write_buffer: cfg.write_buffer,
356 },
357 )
358 .await?;
359
360 let offsets = fixed::Journal::init_at_size(
362 context.with_label("offsets"),
363 fixed::Config {
364 partition: cfg.offsets_partition(),
365 items_per_blob: cfg.items_per_section,
366 page_cache: cfg.page_cache,
367 write_buffer: cfg.write_buffer,
368 },
369 size,
370 )
371 .await?;
372
373 Ok(Self {
374 inner: UpgradableAsyncRwLock::new(Inner {
375 data,
376 size,
377 pruning_boundary: size,
378 }),
379 offsets,
380 items_per_section: cfg.items_per_section.get(),
381 compression: cfg.compression,
382 })
383 }
384
385 #[commonware_macros::stability(ALPHA)]
411 pub(crate) async fn init_sync(
412 context: E,
413 cfg: Config<V::Cfg>,
414 range: Range<u64>,
415 ) -> Result<Self, Error> {
416 assert!(!range.is_empty(), "range must not be empty");
417
418 debug!(
419 range.start,
420 range.end,
421 items_per_section = cfg.items_per_section.get(),
422 "initializing contiguous variable journal for sync"
423 );
424
425 let journal = Self::init(context.with_label("journal"), cfg.clone()).await?;
427
428 let size = journal.size().await;
429
430 if size == 0 {
432 if range.start == 0 {
433 debug!("no existing journal data, returning empty journal");
434 return Ok(journal);
435 } else {
436 debug!(
437 range.start,
438 "no existing journal data, initializing at sync range start"
439 );
440 journal.destroy().await?;
441 return Self::init_at_size(context, cfg, range.start).await;
442 }
443 }
444
445 if size > range.end {
447 return Err(Error::ItemOutOfRange(size));
448 }
449
450 if size <= range.start {
452 debug!(
454 size,
455 range.start, "existing journal data is stale, re-initializing at start position"
456 );
457 journal.destroy().await?;
458 return Self::init_at_size(context, cfg, range.start).await;
459 }
460
461 let bounds = journal.reader().await.bounds();
463 if !bounds.is_empty() && bounds.start < range.start {
464 debug!(
465 oldest_pos = bounds.start,
466 range.start, "pruning journal to sync range start"
467 );
468 journal.prune(range.start).await?;
469 }
470
471 Ok(journal)
472 }
473
474 pub async fn rewind(&self, size: u64) -> Result<(), Error> {
488 let mut inner = self.inner.write().await;
489
490 match size.cmp(&inner.size) {
492 std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
493 std::cmp::Ordering::Equal => return Ok(()), std::cmp::Ordering::Less => {}
495 }
496
497 if size < inner.pruning_boundary {
499 return Err(Error::ItemPruned(size));
500 }
501
502 let discard_offset = {
504 let offsets_reader = self.offsets.reader().await;
505 offsets_reader.read(size).await?
506 };
507 let discard_section = position_to_section(size, self.items_per_section);
508
509 inner
510 .data
511 .rewind_to_offset(discard_section, discard_offset)
512 .await?;
513 self.offsets.rewind(size).await?;
514
515 inner.size = size;
517
518 Ok(())
519 }
520
521 pub async fn append(&self, item: &V) -> Result<u64, Error> {
537 self.append_many(Many::Flat(std::slice::from_ref(item)))
538 .await
539 }
540
541 pub async fn append_many<'a>(&'a self, items: Many<'a, V>) -> Result<u64, Error> {
546 if items.is_empty() {
547 return Err(Error::EmptyAppend);
548 }
549
550 let encode = |item: &V| variable::Journal::<E, V>::encode_item(self.compression, item);
552 let encoded: Vec<_> = match &items {
553 Many::Flat(s) => s.iter().map(encode).collect::<Result<Vec<_>, _>>()?,
554 Many::Nested(nested_items) => nested_items
555 .iter()
556 .flat_map(|items| items.iter())
557 .map(encode)
558 .collect::<Result<Vec<_>, _>>()?,
559 };
560
561 let mut inner = self.inner.write().await;
563
564 let mut last_position = 0;
565 for (index, (buf, _item_len)) in encoded.iter().enumerate() {
566 let section = position_to_section(inner.size, self.items_per_section);
568
569 let offset = inner.data.append_raw(section, buf).await?;
571
572 let offsets_pos = self.offsets.append(&offset).await?;
574 assert_eq!(offsets_pos, inner.size);
575
576 last_position = inner.size;
578 inner.size += 1;
579
580 if inner.size.is_multiple_of(self.items_per_section) {
583 let inner_ref = inner.downgrade_to_upgradable();
584 futures::try_join!(inner_ref.data.sync(section), self.offsets.sync())?;
585 if index + 1 == encoded.len() {
586 return Ok(last_position);
587 }
588 inner = inner_ref.upgrade().await;
589 }
590 }
591
592 Ok(last_position)
593 }
594
595 pub async fn reader(&self) -> Reader<'_, E, V> {
597 Reader {
598 guard: self.inner.read().await,
599 offsets: self.offsets.reader().await,
600 items_per_section: self.items_per_section,
601 }
602 }
603
604 pub async fn size(&self) -> u64 {
607 self.inner.read().await.size
608 }
609
610 pub async fn prune(&self, min_position: u64) -> Result<bool, Error> {
621 let mut inner = self.inner.write().await;
622
623 if min_position <= inner.pruning_boundary {
624 return Ok(false);
625 }
626
627 let min_position = min_position.min(inner.size);
629
630 let min_section = position_to_section(min_position, self.items_per_section);
632
633 let pruned = inner.data.prune(min_section).await?;
634 if pruned {
635 let new_oldest = (min_section * self.items_per_section).max(inner.pruning_boundary);
636 inner.pruning_boundary = new_oldest;
637 self.offsets.prune(new_oldest).await?;
638 }
639 Ok(pruned)
640 }
641
642 pub async fn commit(&self) -> Result<(), Error> {
647 let inner = self.inner.upgradable_read().await;
650
651 let section = position_to_section(inner.size, self.items_per_section);
652 inner.data.sync(section).await
653 }
654
655 pub async fn sync(&self) -> Result<(), Error> {
659 let inner = self.inner.upgradable_read().await;
662
663 let section = position_to_section(inner.size, self.items_per_section);
666
667 futures::try_join!(inner.data.sync(section), self.offsets.sync())?;
670
671 Ok(())
672 }
673
674 pub async fn destroy(self) -> Result<(), Error> {
678 let inner = self.inner.into_inner();
679 inner.data.destroy().await?;
680 self.offsets.destroy().await
681 }
682
683 #[commonware_macros::stability(ALPHA)]
688 pub(crate) async fn clear_to_size(&self, new_size: u64) -> Result<(), Error> {
689 let mut inner = self.inner.write().await;
690 inner.data.clear().await?;
691
692 self.offsets.clear_to_size(new_size).await?;
693 inner.size = new_size;
694 inner.pruning_boundary = new_size;
695 Ok(())
696 }
697
698 async fn align_journals(
708 data: &mut variable::Journal<E, V>,
709 offsets: &mut fixed::Journal<E, u64>,
710 items_per_section: u64,
711 ) -> Result<(u64, u64), Error> {
712 let items_in_last_section = match data.newest_section() {
714 Some(last_section) => {
715 let stream = data.replay(last_section, 0, REPLAY_BUFFER_SIZE).await?;
716 futures::pin_mut!(stream);
717 let mut count = 0u64;
718 while let Some(result) = stream.next().await {
719 result?; count += 1;
721 }
722 count
723 }
724 None => 0,
725 };
726
727 let data_empty =
731 data.is_empty() || (data.num_sections() == 1 && items_in_last_section == 0);
732 if data_empty {
733 let offsets_bounds = {
734 let offsets_reader = offsets.reader().await;
735 offsets_reader.bounds()
736 };
737 let size = offsets_bounds.end;
738
739 if !data.is_empty() {
740 let data_first_section = data.oldest_section().unwrap();
746 let data_section_start = data_first_section * items_per_section;
747 let target_pos = data_section_start.max(offsets_bounds.start);
748
749 warn!("crash repair: clearing offsets to {target_pos} (empty section crash)");
750 offsets.clear_to_size(target_pos).await?;
751 return Ok((target_pos, target_pos));
752 }
753
754 if !offsets_bounds.is_empty() && offsets_bounds.start < size {
759 warn!("crash repair: clearing offsets to {size} (prune-all crash)");
763 offsets.clear_to_size(size).await?;
764 }
765
766 return Ok((size, size));
767 }
768
769 let data_first_section = data.oldest_section().unwrap();
771 let data_last_section = data.newest_section().unwrap();
772
773 let data_oldest_pos = data_first_section * items_per_section;
776
777 {
780 let offsets_bounds = {
781 let offsets_reader = offsets.reader().await;
782 offsets_reader.bounds()
783 };
784 match (
785 offsets_bounds.is_empty(),
786 offsets_bounds.start.cmp(&data_oldest_pos),
787 ) {
788 (true, _) => {
789 let offsets_first_section = offsets_bounds.start / items_per_section;
792 if offsets_first_section != data_first_section {
793 return Err(Error::Corruption(format!(
794 "offsets journal empty at section {offsets_first_section} != data section {data_first_section}"
795 )));
796 }
797 warn!(
798 "crash repair: offsets journal empty at {}, will rebuild from data",
799 offsets_bounds.start
800 );
801 }
802 (false, std::cmp::Ordering::Less) => {
803 warn!("crash repair: pruning offsets journal to {data_oldest_pos}");
805 offsets.prune(data_oldest_pos).await?;
806 }
807 (false, std::cmp::Ordering::Greater) => {
808 if offsets_bounds.start / items_per_section > data_first_section {
810 return Err(Error::Corruption(format!(
811 "offsets oldest pos ({}) > data oldest pos ({data_oldest_pos})",
812 offsets_bounds.start
813 )));
814 }
815 }
816 (false, std::cmp::Ordering::Equal) => {
817 }
819 }
820 }
821
822 let (offsets_bounds, data_size) = {
830 let offsets_reader = offsets.reader().await;
831 let offsets_bounds = offsets_reader.bounds();
832 let data_size = if data_first_section == data_last_section {
833 offsets_bounds.start + items_in_last_section
834 } else {
835 let oldest_items =
836 (data_first_section + 1) * items_per_section - offsets_bounds.start;
837 let middle_items = (data_last_section - data_first_section - 1) * items_per_section;
838 offsets_bounds.start + oldest_items + middle_items + items_in_last_section
839 };
840 (offsets_bounds, data_size)
841 };
842
843 let offsets_size = offsets_bounds.end;
845 if offsets_size > data_size {
846 warn!("crash repair: rewinding offsets from {offsets_size} to {data_size}");
848 offsets.rewind(data_size).await?;
849 } else if offsets_size < data_size {
850 Self::add_missing_offsets(data, offsets, offsets_size, items_per_section).await?;
852 }
853
854 let pruning_boundary = {
856 let offsets_reader = offsets.reader().await;
857 let offsets_bounds = offsets_reader.bounds();
858 assert_eq!(offsets_bounds.end, data_size);
859
860 assert!(
863 !offsets_bounds.is_empty(),
864 "offsets should have data after alignment"
865 );
866 assert_eq!(
867 offsets_bounds.start / items_per_section,
868 data_first_section,
869 "offsets and data should be in same oldest section"
870 );
871 offsets_bounds.start
872 };
873
874 offsets.sync().await?;
875 Ok((pruning_boundary, data_size))
876 }
877
878 async fn add_missing_offsets(
889 data: &variable::Journal<E, V>,
890 offsets: &mut fixed::Journal<E, u64>,
891 offsets_size: u64,
892 items_per_section: u64,
893 ) -> Result<(), Error> {
894 assert!(
895 !data.is_empty(),
896 "rebuild_offsets called with empty data journal"
897 );
898
899 let (start_section, resume_offset, skip_first) = {
901 let offsets_reader = offsets.reader().await;
902 let offsets_bounds = offsets_reader.bounds();
903 if offsets_bounds.is_empty() {
904 let first_section = data.oldest_section().unwrap();
907 (first_section, 0, false)
908 } else if offsets_bounds.start < offsets_size {
909 let last_offset = offsets_reader.read(offsets_size - 1).await?;
911 let last_section = position_to_section(offsets_size - 1, items_per_section);
912 (last_section, last_offset, true)
913 } else {
914 let first_section = data.oldest_section().unwrap();
917 (first_section, 0, false)
918 }
919 };
920
921 let stream = data
925 .replay(start_section, resume_offset, REPLAY_BUFFER_SIZE)
926 .await?;
927 futures::pin_mut!(stream);
928
929 let mut skipped_first = false;
930 while let Some(result) = stream.next().await {
931 let (_section, offset, _size, _item) = result?;
932
933 if skip_first && !skipped_first {
935 skipped_first = true;
936 continue;
937 }
938
939 offsets.append(&offset).await?;
940 }
941
942 Ok(())
943 }
944}
945
946impl<E: Context, V: CodecShared> Contiguous for Journal<E, V> {
948 type Item = V;
949
950 async fn reader(&self) -> impl super::Reader<Item = V> + '_ {
951 Self::reader(self).await
952 }
953
954 async fn size(&self) -> u64 {
955 Self::size(self).await
956 }
957}
958
959impl<E: Context, V: CodecShared> Mutable for Journal<E, V> {
960 async fn append(&mut self, item: &Self::Item) -> Result<u64, Error> {
961 Self::append(self, item).await
962 }
963
964 async fn append_many<'a>(&'a mut self, items: Many<'a, Self::Item>) -> Result<u64, Error> {
965 Self::append_many(self, items).await
966 }
967
968 async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
969 Self::prune(self, min_position).await
970 }
971
972 async fn rewind(&mut self, size: u64) -> Result<(), Error> {
973 Self::rewind(self, size).await
974 }
975}
976
977impl<E: Context, V: CodecShared> Persistable for Journal<E, V> {
978 type Error = Error;
979
980 async fn commit(&self) -> Result<(), Error> {
981 self.commit().await
982 }
983
984 async fn sync(&self) -> Result<(), Error> {
985 self.sync().await
986 }
987
988 async fn destroy(self) -> Result<(), Error> {
989 self.destroy().await
990 }
991}
992
993#[commonware_macros::stability(ALPHA)]
994impl<E: Context, V: CodecShared> crate::journal::authenticated::Inner<E> for Journal<E, V> {
995 type Config = Config<V::Cfg>;
996
997 async fn init<F: crate::merkle::Family, H: commonware_cryptography::Hasher>(
998 context: E,
999 merkle_cfg: crate::merkle::journaled::Config,
1000 journal_cfg: Self::Config,
1001 rewind_predicate: fn(&V) -> bool,
1002 ) -> Result<
1003 crate::journal::authenticated::Journal<F, E, Self, H>,
1004 crate::journal::authenticated::Error<F>,
1005 > {
1006 crate::journal::authenticated::Journal::<F, E, Self, H>::new(
1007 context,
1008 merkle_cfg,
1009 journal_cfg,
1010 rewind_predicate,
1011 )
1012 .await
1013 }
1014}
1015
1016#[cfg(test)]
1017impl<E: Context, V: CodecShared> Journal<E, V> {
1018 pub(crate) async fn read(&self, position: u64) -> Result<V, Error> {
1020 self.reader().await.read(position).await
1021 }
1022
1023 pub(crate) async fn bounds(&self) -> std::ops::Range<u64> {
1025 self.reader().await.bounds()
1026 }
1027
1028 pub(crate) async fn test_prune_data(&self, section: u64) -> Result<bool, Error> {
1030 let mut inner = self.inner.write().await;
1031 inner.data.prune(section).await
1032 }
1033
1034 pub(crate) async fn test_prune_offsets(&self, position: u64) -> Result<bool, Error> {
1036 self.offsets.prune(position).await
1037 }
1038
1039 pub(crate) async fn test_rewind_offsets(&self, position: u64) -> Result<(), Error> {
1041 self.offsets.rewind(position).await
1042 }
1043
1044 pub(crate) async fn test_offsets_size(&self) -> u64 {
1046 self.offsets.size().await
1047 }
1048
1049 pub(crate) async fn test_append_data(
1051 &self,
1052 section: u64,
1053 item: V,
1054 ) -> Result<(u64, u32), Error> {
1055 let mut inner = self.inner.write().await;
1056 inner.data.append(section, &item).await
1057 }
1058
1059 pub(crate) async fn test_sync_data(&self) -> Result<(), Error> {
1061 let inner = self.inner.read().await;
1062 inner
1063 .data
1064 .sync(inner.data.newest_section().unwrap_or(0))
1065 .await
1066 }
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071 use super::*;
1072 use crate::journal::contiguous::tests::run_contiguous_tests;
1073 use commonware_macros::test_traced;
1074 use commonware_runtime::{buffer::paged::CacheRef, deterministic, Metrics, Runner, Storage};
1075 use commonware_utils::{NZUsize, NZU16, NZU64};
1076 use futures::FutureExt as _;
1077 use std::num::NonZeroU16;
1078
1079 const PAGE_SIZE: NonZeroU16 = NZU16!(101);
1081 const PAGE_CACHE_SIZE: usize = 2;
1082 const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(1024);
1084 const SMALL_PAGE_SIZE: NonZeroU16 = NZU16!(512);
1085
1086 #[test_traced]
1092 fn test_variable_offsets_partition_loss_after_prune_unrecoverable() {
1093 let executor = deterministic::Runner::default();
1094 executor.start(|context| async move {
1095 let cfg = Config {
1096 partition: "offsets-loss-after-prune".into(),
1097 items_per_section: NZU64!(10),
1098 compression: None,
1099 codec_config: (),
1100 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1101 write_buffer: NZUsize!(1024),
1102 };
1103
1104 let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1106 .await
1107 .unwrap();
1108
1109 for i in 0..40u64 {
1111 journal.append(&(i * 100)).await.unwrap();
1112 }
1113
1114 journal.prune(20).await.unwrap();
1116 let bounds = journal.bounds().await;
1117 assert_eq!(bounds.start, 20);
1118 assert_eq!(bounds.end, 40);
1119
1120 journal.sync().await.unwrap();
1121 drop(journal);
1122
1123 context
1126 .remove(&format!("{}-blobs", cfg.offsets_partition()), None)
1127 .await
1128 .expect("Failed to remove offsets blobs partition");
1129 context
1130 .remove(&format!("{}-metadata", cfg.offsets_partition()), None)
1131 .await
1132 .expect("Failed to remove offsets metadata partition");
1133
1134 let result = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()).await;
1136 assert!(matches!(result, Err(Error::Corruption(_))));
1137 });
1138 }
1139
1140 #[test_traced]
1148 fn test_variable_align_data_offsets_mismatch() {
1149 let executor = deterministic::Runner::default();
1150 executor.start(|context| async move {
1151 let cfg = Config {
1152 partition: "data-loss-test".into(),
1153 items_per_section: NZU64!(10),
1154 compression: None,
1155 codec_config: (),
1156 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1157 write_buffer: NZUsize!(1024),
1158 };
1159
1160 let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1162 .await
1163 .unwrap();
1164
1165 for i in 0..20u64 {
1167 variable.append(&(i * 100)).await.unwrap();
1168 }
1169
1170 variable.sync().await.unwrap();
1171 drop(variable);
1172
1173 context
1175 .remove(&cfg.data_partition(), None)
1176 .await
1177 .expect("Failed to remove data partition");
1178
1179 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1181 .await
1182 .expect("Should align offsets to match empty data");
1183
1184 assert_eq!(journal.size().await, 20);
1186
1187 assert!(journal.bounds().await.is_empty());
1189
1190 for i in 0..20 {
1192 assert!(matches!(
1193 journal.read(i).await,
1194 Err(crate::journal::Error::ItemPruned(_))
1195 ));
1196 }
1197
1198 let pos = journal.append(&999).await.unwrap();
1200 assert_eq!(pos, 20);
1201 assert_eq!(journal.read(20).await.unwrap(), 999);
1202
1203 journal.destroy().await.unwrap();
1204 });
1205 }
1206
1207 #[test_traced]
1209 fn test_variable_replay() {
1210 let executor = deterministic::Runner::default();
1211 executor.start(|context| async move {
1212 let cfg = Config {
1213 partition: "replay".into(),
1214 items_per_section: NZU64!(10),
1215 compression: None,
1216 codec_config: (),
1217 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1218 write_buffer: NZUsize!(1024),
1219 };
1220
1221 let journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1223
1224 for i in 0..40u64 {
1226 journal.append(&(i * 100)).await.unwrap();
1227 }
1228
1229 {
1231 let reader = journal.reader().await;
1232 let stream = reader.replay(NZUsize!(20), 0).await.unwrap();
1233 futures::pin_mut!(stream);
1234 for i in 0..40u64 {
1235 let (pos, item) = stream.next().await.unwrap().unwrap();
1236 assert_eq!(pos, i);
1237 assert_eq!(item, i * 100);
1238 }
1239 assert!(stream.next().await.is_none());
1240 }
1241
1242 {
1244 let reader = journal.reader().await;
1245 let stream = reader.replay(NZUsize!(20), 15).await.unwrap();
1246 futures::pin_mut!(stream);
1247 for i in 15..40u64 {
1248 let (pos, item) = stream.next().await.unwrap().unwrap();
1249 assert_eq!(pos, i);
1250 assert_eq!(item, i * 100);
1251 }
1252 assert!(stream.next().await.is_none());
1253 }
1254
1255 {
1257 let reader = journal.reader().await;
1258 let stream = reader.replay(NZUsize!(20), 20).await.unwrap();
1259 futures::pin_mut!(stream);
1260 for i in 20..40u64 {
1261 let (pos, item) = stream.next().await.unwrap().unwrap();
1262 assert_eq!(pos, i);
1263 assert_eq!(item, i * 100);
1264 }
1265 assert!(stream.next().await.is_none());
1266 }
1267
1268 journal.prune(20).await.unwrap();
1270 {
1271 let reader = journal.reader().await;
1272 let res = reader.replay(NZUsize!(20), 0).await;
1273 assert!(matches!(res, Err(crate::journal::Error::ItemPruned(_))));
1274 }
1275 {
1276 let reader = journal.reader().await;
1277 let res = reader.replay(NZUsize!(20), 19).await;
1278 assert!(matches!(res, Err(crate::journal::Error::ItemPruned(_))));
1279 }
1280
1281 {
1283 let reader = journal.reader().await;
1284 let stream = reader.replay(NZUsize!(20), 20).await.unwrap();
1285 futures::pin_mut!(stream);
1286 for i in 20..40u64 {
1287 let (pos, item) = stream.next().await.unwrap().unwrap();
1288 assert_eq!(pos, i);
1289 assert_eq!(item, i * 100);
1290 }
1291 assert!(stream.next().await.is_none());
1292 }
1293
1294 {
1296 let reader = journal.reader().await;
1297 let stream = reader.replay(NZUsize!(20), 40).await.unwrap();
1298 futures::pin_mut!(stream);
1299 assert!(stream.next().await.is_none());
1300 }
1301
1302 {
1304 let reader = journal.reader().await;
1305 let res = reader.replay(NZUsize!(20), 41).await;
1306 assert!(matches!(
1307 res,
1308 Err(crate::journal::Error::ItemOutOfRange(41))
1309 ));
1310 }
1311
1312 journal.destroy().await.unwrap();
1313 });
1314 }
1315
1316 #[test_traced]
1317 fn test_variable_contiguous() {
1318 let executor = deterministic::Runner::default();
1319 executor.start(|context| async move {
1320 run_contiguous_tests(move |test_name: String, idx: usize| {
1321 let label = test_name.replace('-', "_");
1322 let context = context.with_label(&format!("{label}_{idx}"));
1323 async move {
1324 let cfg = Config {
1325 partition: format!("generic-test-{test_name}"),
1326 items_per_section: NZU64!(10),
1327 compression: None,
1328 codec_config: (),
1329 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1330 write_buffer: NZUsize!(1024),
1331 };
1332 Journal::<_, u64>::init(context, cfg).await
1333 }
1334 .boxed()
1335 })
1336 .await;
1337 });
1338 }
1339
1340 #[test_traced]
1342 fn test_variable_multiple_sequential_prunes() {
1343 let executor = deterministic::Runner::default();
1344 executor.start(|context| async move {
1345 let cfg = Config {
1346 partition: "sequential-prunes".into(),
1347 items_per_section: NZU64!(10),
1348 compression: None,
1349 codec_config: (),
1350 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1351 write_buffer: NZUsize!(1024),
1352 };
1353
1354 let journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1355
1356 for i in 0..40u64 {
1358 journal.append(&(i * 100)).await.unwrap();
1359 }
1360
1361 let bounds = journal.bounds().await;
1363 assert_eq!(bounds.start, 0);
1364 assert_eq!(bounds.end, 40);
1365
1366 let pruned = journal.prune(10).await.unwrap();
1368 assert!(pruned);
1369
1370 assert_eq!(journal.bounds().await.start, 10);
1372
1373 assert!(matches!(
1375 journal.read(0).await,
1376 Err(crate::journal::Error::ItemPruned(_))
1377 ));
1378 assert_eq!(journal.read(10).await.unwrap(), 1000);
1379 assert_eq!(journal.read(19).await.unwrap(), 1900);
1380
1381 let pruned = journal.prune(20).await.unwrap();
1383 assert!(pruned);
1384
1385 assert_eq!(journal.bounds().await.start, 20);
1387
1388 assert!(matches!(
1390 journal.read(10).await,
1391 Err(crate::journal::Error::ItemPruned(_))
1392 ));
1393 assert!(matches!(
1394 journal.read(19).await,
1395 Err(crate::journal::Error::ItemPruned(_))
1396 ));
1397 assert_eq!(journal.read(20).await.unwrap(), 2000);
1398 assert_eq!(journal.read(29).await.unwrap(), 2900);
1399
1400 let pruned = journal.prune(30).await.unwrap();
1402 assert!(pruned);
1403
1404 assert_eq!(journal.bounds().await.start, 30);
1406
1407 assert!(matches!(
1409 journal.read(20).await,
1410 Err(crate::journal::Error::ItemPruned(_))
1411 ));
1412 assert!(matches!(
1413 journal.read(29).await,
1414 Err(crate::journal::Error::ItemPruned(_))
1415 ));
1416 assert_eq!(journal.read(30).await.unwrap(), 3000);
1417 assert_eq!(journal.read(39).await.unwrap(), 3900);
1418
1419 assert_eq!(journal.size().await, 40);
1421
1422 journal.destroy().await.unwrap();
1423 });
1424 }
1425
1426 #[test_traced]
1428 fn test_variable_prune_all_then_reinit() {
1429 let executor = deterministic::Runner::default();
1430 executor.start(|context| async move {
1431 let cfg = Config {
1432 partition: "prune-all-reinit".into(),
1433 items_per_section: NZU64!(10),
1434 compression: None,
1435 codec_config: (),
1436 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1437 write_buffer: NZUsize!(1024),
1438 };
1439
1440 let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1442 .await
1443 .unwrap();
1444
1445 for i in 0..100u64 {
1446 journal.append(&(i * 100)).await.unwrap();
1447 }
1448
1449 let bounds = journal.bounds().await;
1450 assert_eq!(bounds.end, 100);
1451 assert_eq!(bounds.start, 0);
1452
1453 let pruned = journal.prune(100).await.unwrap();
1455 assert!(pruned);
1456
1457 let bounds = journal.bounds().await;
1459 assert_eq!(bounds.end, 100);
1460 assert!(bounds.is_empty());
1461
1462 for i in 0..100 {
1464 assert!(matches!(
1465 journal.read(i).await,
1466 Err(crate::journal::Error::ItemPruned(_))
1467 ));
1468 }
1469
1470 journal.sync().await.unwrap();
1471 drop(journal);
1472
1473 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1475 .await
1476 .unwrap();
1477
1478 let bounds = journal.bounds().await;
1480 assert_eq!(bounds.end, 100);
1481 assert!(bounds.is_empty());
1482
1483 for i in 0..100 {
1485 assert!(matches!(
1486 journal.read(i).await,
1487 Err(crate::journal::Error::ItemPruned(_))
1488 ));
1489 }
1490
1491 journal.append(&10000).await.unwrap();
1494 let bounds = journal.bounds().await;
1495 assert_eq!(bounds.end, 101);
1496 assert_eq!(bounds.start, 100);
1498
1499 assert_eq!(journal.read(100).await.unwrap(), 10000);
1501
1502 assert!(matches!(
1504 journal.read(99).await,
1505 Err(crate::journal::Error::ItemPruned(_))
1506 ));
1507
1508 journal.destroy().await.unwrap();
1509 });
1510 }
1511
1512 #[test_traced]
1514 fn test_variable_recovery_prune_crash_offsets_behind() {
1515 let executor = deterministic::Runner::default();
1516 executor.start(|context| async move {
1517 let cfg = Config {
1519 partition: "recovery-prune-crash".into(),
1520 items_per_section: NZU64!(10),
1521 compression: None,
1522 codec_config: (),
1523 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1524 write_buffer: NZUsize!(1024),
1525 };
1526
1527 let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1528 .await
1529 .unwrap();
1530
1531 for i in 0..40u64 {
1533 variable.append(&(i * 100)).await.unwrap();
1534 }
1535
1536 variable.prune(10).await.unwrap();
1538 assert_eq!(variable.bounds().await.start, 10);
1539
1540 variable.test_prune_data(2).await.unwrap();
1543 variable.sync().await.unwrap();
1546 drop(variable);
1547
1548 let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1550 .await
1551 .unwrap();
1552
1553 let bounds = variable.bounds().await;
1555 assert_eq!(bounds.start, 20);
1556 assert_eq!(bounds.end, 40);
1557
1558 assert!(matches!(
1560 variable.read(10).await,
1561 Err(crate::journal::Error::ItemPruned(_))
1562 ));
1563
1564 assert_eq!(variable.read(20).await.unwrap(), 2000);
1566 assert_eq!(variable.read(39).await.unwrap(), 3900);
1567
1568 variable.destroy().await.unwrap();
1569 });
1570 }
1571
1572 #[test_traced]
1577 fn test_variable_recovery_offsets_ahead_corruption() {
1578 let executor = deterministic::Runner::default();
1579 executor.start(|context| async move {
1580 let cfg = Config {
1582 partition: "recovery-offsets-ahead".into(),
1583 items_per_section: NZU64!(10),
1584 compression: None,
1585 codec_config: (),
1586 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1587 write_buffer: NZUsize!(1024),
1588 };
1589
1590 let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1591 .await
1592 .unwrap();
1593
1594 for i in 0..40u64 {
1596 variable.append(&(i * 100)).await.unwrap();
1597 }
1598
1599 variable.test_prune_offsets(20).await.unwrap(); variable.test_prune_data(1).await.unwrap(); variable.sync().await.unwrap();
1604 drop(variable);
1605
1606 let result = Journal::<_, u64>::init(context.with_label("second"), cfg.clone()).await;
1608 assert!(matches!(result, Err(Error::Corruption(_))));
1609 });
1610 }
1611
1612 #[test_traced]
1614 fn test_variable_recovery_append_crash_offsets_behind() {
1615 let executor = deterministic::Runner::default();
1616 executor.start(|context| async move {
1617 let cfg = Config {
1619 partition: "recovery-append-crash".into(),
1620 items_per_section: NZU64!(10),
1621 compression: None,
1622 codec_config: (),
1623 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1624 write_buffer: NZUsize!(1024),
1625 };
1626
1627 let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1628 .await
1629 .unwrap();
1630
1631 for i in 0..15u64 {
1633 variable.append(&(i * 100)).await.unwrap();
1634 }
1635
1636 assert_eq!(variable.size().await, 15);
1637
1638 for i in 15..20u64 {
1640 variable.test_append_data(1, i * 100).await.unwrap();
1641 }
1642 variable.sync().await.unwrap();
1645 drop(variable);
1646
1647 let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1649 .await
1650 .unwrap();
1651
1652 let bounds = variable.bounds().await;
1654 assert_eq!(bounds.end, 20);
1655 assert_eq!(bounds.start, 0);
1656
1657 for i in 0..20u64 {
1659 assert_eq!(variable.read(i).await.unwrap(), i * 100);
1660 }
1661
1662 assert_eq!(variable.test_offsets_size().await, 20);
1664
1665 variable.destroy().await.unwrap();
1666 });
1667 }
1668
1669 #[test_traced]
1671 fn test_variable_recovery_multiple_prunes_crash() {
1672 let executor = deterministic::Runner::default();
1673 executor.start(|context| async move {
1674 let cfg = Config {
1676 partition: "recovery-multiple-prunes".into(),
1677 items_per_section: NZU64!(10),
1678 compression: None,
1679 codec_config: (),
1680 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1681 write_buffer: NZUsize!(1024),
1682 };
1683
1684 let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1685 .await
1686 .unwrap();
1687
1688 for i in 0..50u64 {
1690 variable.append(&(i * 100)).await.unwrap();
1691 }
1692
1693 variable.prune(10).await.unwrap();
1695 assert_eq!(variable.bounds().await.start, 10);
1696
1697 variable.test_prune_data(3).await.unwrap();
1700 variable.sync().await.unwrap();
1703 drop(variable);
1704
1705 let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1707 .await
1708 .unwrap();
1709
1710 let bounds = variable.bounds().await;
1712 assert_eq!(bounds.start, 30);
1713 assert_eq!(bounds.end, 50);
1714
1715 assert!(matches!(
1717 variable.read(10).await,
1718 Err(crate::journal::Error::ItemPruned(_))
1719 ));
1720 assert!(matches!(
1721 variable.read(20).await,
1722 Err(crate::journal::Error::ItemPruned(_))
1723 ));
1724
1725 assert_eq!(variable.read(30).await.unwrap(), 3000);
1727 assert_eq!(variable.read(49).await.unwrap(), 4900);
1728
1729 variable.destroy().await.unwrap();
1730 });
1731 }
1732
1733 #[test_traced]
1740 fn test_variable_recovery_rewind_crash_multi_section() {
1741 let executor = deterministic::Runner::default();
1742 executor.start(|context| async move {
1743 let cfg = Config {
1745 partition: "recovery-rewind-crash".into(),
1746 items_per_section: NZU64!(10),
1747 compression: None,
1748 codec_config: (),
1749 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1750 write_buffer: NZUsize!(1024),
1751 };
1752
1753 let variable = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1754 .await
1755 .unwrap();
1756
1757 for i in 0..25u64 {
1759 variable.append(&(i * 100)).await.unwrap();
1760 }
1761
1762 assert_eq!(variable.size().await, 25);
1763
1764 variable.test_rewind_offsets(5).await.unwrap();
1767 variable.sync().await.unwrap();
1770 drop(variable);
1771
1772 let variable = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1774 .await
1775 .unwrap();
1776
1777 let bounds = variable.bounds().await;
1779 assert_eq!(bounds.end, 25);
1780 assert_eq!(bounds.start, 0);
1781
1782 for i in 0..25u64 {
1784 assert_eq!(variable.read(i).await.unwrap(), i * 100);
1785 }
1786
1787 assert_eq!(variable.test_offsets_size().await, 25);
1789
1790 let pos = variable.append(&2500).await.unwrap();
1792 assert_eq!(pos, 25);
1793 assert_eq!(variable.read(25).await.unwrap(), 2500);
1794
1795 variable.destroy().await.unwrap();
1796 });
1797 }
1798
1799 #[test_traced]
1802 fn test_variable_recovery_empty_offsets_after_prune_and_append() {
1803 let executor = deterministic::Runner::default();
1804 executor.start(|context| async move {
1805 let cfg = Config {
1806 partition: "recovery-empty-after-prune".into(),
1807 items_per_section: NZU64!(10),
1808 compression: None,
1809 codec_config: (),
1810 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1811 write_buffer: NZUsize!(1024),
1812 };
1813
1814 let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1816 .await
1817 .unwrap();
1818
1819 for i in 0..10u64 {
1821 journal.append(&(i * 100)).await.unwrap();
1822 }
1823 let bounds = journal.bounds().await;
1824 assert_eq!(bounds.end, 10);
1825 assert_eq!(bounds.start, 0);
1826
1827 journal.prune(10).await.unwrap();
1829 let bounds = journal.bounds().await;
1830 assert_eq!(bounds.end, 10);
1831 assert!(bounds.is_empty()); for i in 10..20u64 {
1837 journal.test_append_data(1, i * 100).await.unwrap();
1838 }
1839 journal.test_sync_data().await.unwrap();
1841 drop(journal);
1845
1846 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1848 .await
1849 .expect("Should recover from crash after data sync but before offsets sync");
1850
1851 let bounds = journal.bounds().await;
1853 assert_eq!(bounds.end, 20);
1854 assert_eq!(bounds.start, 10);
1855
1856 for i in 10..20u64 {
1858 assert_eq!(journal.read(i).await.unwrap(), i * 100);
1859 }
1860
1861 for i in 0..10 {
1863 assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1864 }
1865
1866 journal.destroy().await.unwrap();
1867 });
1868 }
1869
1870 #[test_traced]
1872 fn test_variable_concurrent_sync_recovery() {
1873 let executor = deterministic::Runner::default();
1874 executor.start(|context| async move {
1875 let cfg = Config {
1876 partition: "concurrent-sync-recovery".into(),
1877 items_per_section: NZU64!(10),
1878 compression: None,
1879 codec_config: (),
1880 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1881 write_buffer: NZUsize!(1024),
1882 };
1883
1884 let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
1885 .await
1886 .unwrap();
1887
1888 for i in 0..15u64 {
1890 journal.append(&(i * 100)).await.unwrap();
1891 }
1892
1893 journal.commit().await.unwrap();
1895
1896 drop(journal);
1898
1899 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
1900 .await
1901 .unwrap();
1902
1903 assert_eq!(journal.size().await, 15);
1905 for i in 0..15u64 {
1906 assert_eq!(journal.read(i).await.unwrap(), i * 100);
1907 }
1908
1909 journal.destroy().await.unwrap();
1910 });
1911 }
1912
1913 #[test_traced]
1914 fn test_init_at_size_zero() {
1915 let executor = deterministic::Runner::default();
1916 executor.start(|context| async move {
1917 let cfg = Config {
1918 partition: "init-at-size-zero".into(),
1919 items_per_section: NZU64!(5),
1920 compression: None,
1921 codec_config: (),
1922 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1923 write_buffer: NZUsize!(1024),
1924 };
1925
1926 let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 0)
1927 .await
1928 .unwrap();
1929
1930 assert_eq!(journal.size().await, 0);
1932
1933 assert!(journal.bounds().await.is_empty());
1935
1936 let pos = journal.append(&100).await.unwrap();
1938 assert_eq!(pos, 0);
1939 assert_eq!(journal.size().await, 1);
1940 assert_eq!(journal.read(0).await.unwrap(), 100);
1941
1942 journal.destroy().await.unwrap();
1943 });
1944 }
1945
1946 #[test_traced]
1947 fn test_init_at_size_section_boundary() {
1948 let executor = deterministic::Runner::default();
1949 executor.start(|context| async move {
1950 let cfg = Config {
1951 partition: "init-at-size-boundary".into(),
1952 items_per_section: NZU64!(5),
1953 compression: None,
1954 codec_config: (),
1955 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1956 write_buffer: NZUsize!(1024),
1957 };
1958
1959 let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 10)
1961 .await
1962 .unwrap();
1963
1964 let bounds = journal.bounds().await;
1966 assert_eq!(bounds.end, 10);
1967
1968 assert!(bounds.is_empty());
1970
1971 let pos = journal.append(&1000).await.unwrap();
1973 assert_eq!(pos, 10);
1974 assert_eq!(journal.size().await, 11);
1975 assert_eq!(journal.read(10).await.unwrap(), 1000);
1976
1977 let pos = journal.append(&1001).await.unwrap();
1979 assert_eq!(pos, 11);
1980 assert_eq!(journal.read(11).await.unwrap(), 1001);
1981
1982 journal.destroy().await.unwrap();
1983 });
1984 }
1985
1986 #[test_traced]
1987 fn test_init_at_size_mid_section() {
1988 let executor = deterministic::Runner::default();
1989 executor.start(|context| async move {
1990 let cfg = Config {
1991 partition: "init-at-size-mid".into(),
1992 items_per_section: NZU64!(5),
1993 compression: None,
1994 codec_config: (),
1995 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1996 write_buffer: NZUsize!(1024),
1997 };
1998
1999 let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 7)
2001 .await
2002 .unwrap();
2003
2004 let bounds = journal.bounds().await;
2006 assert_eq!(bounds.end, 7);
2007
2008 assert!(bounds.is_empty());
2010
2011 let pos = journal.append(&700).await.unwrap();
2013 assert_eq!(pos, 7);
2014 assert_eq!(journal.size().await, 8);
2015 assert_eq!(journal.read(7).await.unwrap(), 700);
2016
2017 journal.destroy().await.unwrap();
2018 });
2019 }
2020
2021 #[test_traced]
2022 fn test_init_at_size_persistence() {
2023 let executor = deterministic::Runner::default();
2024 executor.start(|context| async move {
2025 let cfg = Config {
2026 partition: "init-at-size-persist".into(),
2027 items_per_section: NZU64!(5),
2028 compression: None,
2029 codec_config: (),
2030 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2031 write_buffer: NZUsize!(1024),
2032 };
2033
2034 let journal =
2036 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2037 .await
2038 .unwrap();
2039
2040 for i in 0..5u64 {
2042 let pos = journal.append(&(1500 + i)).await.unwrap();
2043 assert_eq!(pos, 15 + i);
2044 }
2045
2046 assert_eq!(journal.size().await, 20);
2047
2048 journal.sync().await.unwrap();
2050 drop(journal);
2051
2052 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2053 .await
2054 .unwrap();
2055
2056 let bounds = journal.bounds().await;
2058 assert_eq!(bounds.end, 20);
2059 assert_eq!(bounds.start, 15);
2060
2061 for i in 0..5u64 {
2063 assert_eq!(journal.read(15 + i).await.unwrap(), 1500 + i);
2064 }
2065
2066 let pos = journal.append(&9999).await.unwrap();
2068 assert_eq!(pos, 20);
2069 assert_eq!(journal.read(20).await.unwrap(), 9999);
2070
2071 journal.destroy().await.unwrap();
2072 });
2073 }
2074
2075 #[test_traced]
2076 fn test_init_at_size_persistence_without_data() {
2077 let executor = deterministic::Runner::default();
2078 executor.start(|context| async move {
2079 let cfg = Config {
2080 partition: "init-at-size-persist-empty".into(),
2081 items_per_section: NZU64!(5),
2082 compression: None,
2083 codec_config: (),
2084 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2085 write_buffer: NZUsize!(1024),
2086 };
2087
2088 let journal =
2090 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2091 .await
2092 .unwrap();
2093
2094 let bounds = journal.bounds().await;
2095 assert_eq!(bounds.end, 15);
2096 assert!(bounds.is_empty());
2097
2098 drop(journal);
2100
2101 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2103 .await
2104 .unwrap();
2105
2106 let bounds = journal.bounds().await;
2107 assert_eq!(bounds.end, 15);
2108 assert!(bounds.is_empty());
2109
2110 let pos = journal.append(&1500).await.unwrap();
2112 assert_eq!(pos, 15);
2113 assert_eq!(journal.read(15).await.unwrap(), 1500);
2114
2115 journal.destroy().await.unwrap();
2116 });
2117 }
2118
2119 #[test_traced]
2121 fn test_init_at_size_mid_section_persistence() {
2122 let executor = deterministic::Runner::default();
2123 executor.start(|context| async move {
2124 let cfg = Config {
2125 partition: "init-at-size-mid-section".into(),
2126 items_per_section: NZU64!(5),
2127 compression: None,
2128 codec_config: (),
2129 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2130 write_buffer: NZUsize!(1024),
2131 };
2132
2133 let journal =
2135 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2136 .await
2137 .unwrap();
2138
2139 for i in 0..3u64 {
2141 let pos = journal.append(&(700 + i)).await.unwrap();
2142 assert_eq!(pos, 7 + i);
2143 }
2144
2145 let bounds = journal.bounds().await;
2146 assert_eq!(bounds.end, 10);
2147 assert_eq!(bounds.start, 7);
2148
2149 journal.sync().await.unwrap();
2151 drop(journal);
2152
2153 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2155 .await
2156 .unwrap();
2157
2158 let bounds = journal.bounds().await;
2160 assert_eq!(bounds.end, 10);
2161 assert_eq!(bounds.start, 7);
2162
2163 for i in 0..3u64 {
2165 assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2166 }
2167
2168 assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2170
2171 journal.destroy().await.unwrap();
2172 });
2173 }
2174
2175 #[test_traced]
2177 fn test_init_at_size_mid_section_multi_section_persistence() {
2178 let executor = deterministic::Runner::default();
2179 executor.start(|context| async move {
2180 let cfg = Config {
2181 partition: "init-at-size-multi-section".into(),
2182 items_per_section: NZU64!(5),
2183 compression: None,
2184 codec_config: (),
2185 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2186 write_buffer: NZUsize!(1024),
2187 };
2188
2189 let journal =
2191 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2192 .await
2193 .unwrap();
2194
2195 for i in 0..8u64 {
2197 let pos = journal.append(&(700 + i)).await.unwrap();
2198 assert_eq!(pos, 7 + i);
2199 }
2200
2201 let bounds = journal.bounds().await;
2202 assert_eq!(bounds.end, 15);
2203 assert_eq!(bounds.start, 7);
2204
2205 journal.sync().await.unwrap();
2207 drop(journal);
2208
2209 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2211 .await
2212 .unwrap();
2213
2214 let bounds = journal.bounds().await;
2216 assert_eq!(bounds.end, 15);
2217 assert_eq!(bounds.start, 7);
2218
2219 for i in 0..8u64 {
2221 assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2222 }
2223
2224 journal.destroy().await.unwrap();
2225 });
2226 }
2227
2228 #[test_traced]
2230 fn test_align_journals_data_empty_mid_section_pruning_boundary() {
2231 let executor = deterministic::Runner::default();
2232 executor.start(|context| async move {
2233 let cfg = Config {
2234 partition: "align-journals-mid-section-pruning-boundary".into(),
2235 items_per_section: NZU64!(5),
2236 compression: None,
2237 codec_config: (),
2238 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2239 write_buffer: NZUsize!(1024),
2240 };
2241
2242 let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
2244 .await
2245 .unwrap();
2246 for i in 0..7u64 {
2247 journal.append(&(100 + i)).await.unwrap();
2248 }
2249 journal.sync().await.unwrap();
2250
2251 journal.inner.write().await.data.clear().await.unwrap();
2253 drop(journal);
2254
2255 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2257 .await
2258 .unwrap();
2259 let bounds = journal.bounds().await;
2260 assert_eq!(bounds.end, 7);
2261 assert!(bounds.is_empty());
2262
2263 let pos = journal.append(&777).await.unwrap();
2265 assert_eq!(pos, 7);
2266 assert_eq!(journal.size().await, 8);
2267 assert_eq!(journal.read(7).await.unwrap(), 777);
2268
2269 let section = 7 / cfg.items_per_section.get();
2271 journal
2272 .inner
2273 .write()
2274 .await
2275 .data
2276 .sync(section)
2277 .await
2278 .unwrap();
2279 drop(journal);
2280
2281 let journal = Journal::<_, u64>::init(context.with_label("third"), cfg.clone())
2283 .await
2284 .unwrap();
2285 let bounds = journal.bounds().await;
2286 assert_eq!(bounds.end, 8);
2287 assert_eq!(bounds.start, 7);
2288 assert_eq!(journal.read(7).await.unwrap(), 777);
2289
2290 journal.destroy().await.unwrap();
2291 });
2292 }
2293
2294 #[test_traced]
2296 fn test_init_at_size_crash_data_synced_offsets_not() {
2297 let executor = deterministic::Runner::default();
2298 executor.start(|context| async move {
2299 let cfg = Config {
2300 partition: "init-at-size-crash-recovery".into(),
2301 items_per_section: NZU64!(5),
2302 compression: None,
2303 codec_config: (),
2304 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2305 write_buffer: NZUsize!(1024),
2306 };
2307
2308 let journal =
2310 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2311 .await
2312 .unwrap();
2313
2314 for i in 0..3u64 {
2316 journal.append(&(700 + i)).await.unwrap();
2317 }
2318
2319 journal.inner.write().await.data.sync(1).await.unwrap();
2321 drop(journal);
2323
2324 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2326 .await
2327 .unwrap();
2328
2329 let bounds = journal.bounds().await;
2331 assert_eq!(bounds.end, 10);
2332 assert_eq!(bounds.start, 7);
2333
2334 for i in 0..3u64 {
2336 assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2337 }
2338
2339 journal.destroy().await.unwrap();
2340 });
2341 }
2342
2343 #[test_traced]
2344 fn test_prune_does_not_move_oldest_retained_backwards() {
2345 let executor = deterministic::Runner::default();
2346 executor.start(|context| async move {
2347 let cfg = Config {
2348 partition: "prune-no-backwards".into(),
2349 items_per_section: NZU64!(5),
2350 compression: None,
2351 codec_config: (),
2352 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2353 write_buffer: NZUsize!(1024),
2354 };
2355
2356 let journal =
2357 Journal::<_, u64>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2358 .await
2359 .unwrap();
2360
2361 for i in 0..3u64 {
2363 let pos = journal.append(&(700 + i)).await.unwrap();
2364 assert_eq!(pos, 7 + i);
2365 }
2366 assert_eq!(journal.bounds().await.start, 7);
2367
2368 journal.prune(8).await.unwrap();
2370 assert_eq!(journal.bounds().await.start, 7);
2371 assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2372 assert_eq!(journal.read(7).await.unwrap(), 700);
2373
2374 journal.destroy().await.unwrap();
2375 });
2376 }
2377
2378 #[test_traced]
2379 fn test_init_at_size_large_offset() {
2380 let executor = deterministic::Runner::default();
2381 executor.start(|context| async move {
2382 let cfg = Config {
2383 partition: "init-at-size-large".into(),
2384 items_per_section: NZU64!(5),
2385 compression: None,
2386 codec_config: (),
2387 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2388 write_buffer: NZUsize!(1024),
2389 };
2390
2391 let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 1000)
2393 .await
2394 .unwrap();
2395
2396 let bounds = journal.bounds().await;
2397 assert_eq!(bounds.end, 1000);
2398 assert!(bounds.is_empty());
2400
2401 let pos = journal.append(&100000).await.unwrap();
2403 assert_eq!(pos, 1000);
2404 assert_eq!(journal.read(1000).await.unwrap(), 100000);
2405
2406 journal.destroy().await.unwrap();
2407 });
2408 }
2409
2410 #[test_traced]
2411 fn test_init_at_size_prune_and_append() {
2412 let executor = deterministic::Runner::default();
2413 executor.start(|context| async move {
2414 let cfg = Config {
2415 partition: "init-at-size-prune".into(),
2416 items_per_section: NZU64!(5),
2417 compression: None,
2418 codec_config: (),
2419 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2420 write_buffer: NZUsize!(1024),
2421 };
2422
2423 let journal = Journal::<_, u64>::init_at_size(context.clone(), cfg.clone(), 20)
2425 .await
2426 .unwrap();
2427
2428 for i in 0..10u64 {
2430 journal.append(&(2000 + i)).await.unwrap();
2431 }
2432
2433 assert_eq!(journal.size().await, 30);
2434
2435 journal.prune(25).await.unwrap();
2437
2438 let bounds = journal.bounds().await;
2439 assert_eq!(bounds.end, 30);
2440 assert_eq!(bounds.start, 25);
2441
2442 for i in 25..30u64 {
2444 assert_eq!(journal.read(i).await.unwrap(), 2000 + (i - 20));
2445 }
2446
2447 let pos = journal.append(&3000).await.unwrap();
2449 assert_eq!(pos, 30);
2450
2451 journal.destroy().await.unwrap();
2452 });
2453 }
2454
2455 #[test_traced]
2457 fn test_init_sync_no_existing_data() {
2458 let executor = deterministic::Runner::default();
2459 executor.start(|context| async move {
2460 let cfg = Config {
2461 partition: "test-fresh-start".into(),
2462 items_per_section: NZU64!(5),
2463 compression: None,
2464 codec_config: (),
2465 write_buffer: NZUsize!(1024),
2466 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2467 };
2468
2469 let lower_bound = 10;
2471 let upper_bound = 26;
2472 let journal =
2473 Journal::init_sync(context.clone(), cfg.clone(), lower_bound..upper_bound)
2474 .await
2475 .expect("Failed to initialize journal with sync boundaries");
2476
2477 let bounds = journal.bounds().await;
2478 assert_eq!(bounds.end, lower_bound);
2479 assert!(bounds.is_empty());
2480
2481 let pos1 = journal.append(&42u64).await.unwrap();
2483 assert_eq!(pos1, lower_bound);
2484 assert_eq!(journal.read(pos1).await.unwrap(), 42u64);
2485
2486 let pos2 = journal.append(&43u64).await.unwrap();
2487 assert_eq!(pos2, lower_bound + 1);
2488 assert_eq!(journal.read(pos2).await.unwrap(), 43u64);
2489
2490 journal.destroy().await.unwrap();
2491 });
2492 }
2493
2494 #[test_traced]
2496 fn test_init_sync_existing_data_overlap() {
2497 let executor = deterministic::Runner::default();
2498 executor.start(|context| async move {
2499 let cfg = Config {
2500 partition: "test-overlap".into(),
2501 items_per_section: NZU64!(5),
2502 compression: None,
2503 codec_config: (),
2504 write_buffer: NZUsize!(1024),
2505 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2506 };
2507
2508 let journal =
2510 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2511 .await
2512 .expect("Failed to create initial journal");
2513
2514 for i in 0..20u64 {
2516 journal.append(&(i * 100)).await.unwrap();
2517 }
2518 journal.sync().await.unwrap();
2519 drop(journal);
2520
2521 let lower_bound = 8;
2524 let upper_bound = 31;
2525 let journal = Journal::<deterministic::Context, u64>::init_sync(
2526 context.clone(),
2527 cfg.clone(),
2528 lower_bound..upper_bound,
2529 )
2530 .await
2531 .expect("Failed to initialize journal with overlap");
2532
2533 assert_eq!(journal.size().await, 20);
2534
2535 assert_eq!(journal.bounds().await.start, 5); assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2540 assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
2541
2542 assert_eq!(journal.read(5).await.unwrap(), 500);
2544 assert_eq!(journal.read(8).await.unwrap(), 800);
2545 assert_eq!(journal.read(19).await.unwrap(), 1900);
2546
2547 assert!(matches!(
2549 journal.read(20).await,
2550 Err(Error::ItemOutOfRange(_))
2551 ));
2552
2553 let pos = journal.append(&999).await.unwrap();
2555 assert_eq!(pos, 20);
2556 assert_eq!(journal.read(20).await.unwrap(), 999);
2557
2558 journal.destroy().await.unwrap();
2559 });
2560 }
2561
2562 #[should_panic]
2564 #[test_traced]
2565 fn test_init_sync_invalid_parameters() {
2566 let executor = deterministic::Runner::default();
2567 executor.start(|context| async move {
2568 let cfg = Config {
2569 partition: "test-invalid".into(),
2570 items_per_section: NZU64!(5),
2571 compression: None,
2572 codec_config: (),
2573 write_buffer: NZUsize!(1024),
2574 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2575 };
2576
2577 #[allow(clippy::reversed_empty_ranges)]
2578 let _result = Journal::<deterministic::Context, u64>::init_sync(
2579 context.clone(),
2580 cfg,
2581 10..5, )
2583 .await;
2584 });
2585 }
2586
2587 #[test_traced]
2589 fn test_init_sync_existing_data_exact_match() {
2590 let executor = deterministic::Runner::default();
2591 executor.start(|context| async move {
2592 let items_per_section = NZU64!(5);
2593 let cfg = Config {
2594 partition: "test-exact-match".into(),
2595 items_per_section,
2596 compression: None,
2597 codec_config: (),
2598 write_buffer: NZUsize!(1024),
2599 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2600 };
2601
2602 let journal =
2604 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2605 .await
2606 .expect("Failed to create initial journal");
2607
2608 for i in 0..20u64 {
2610 journal.append(&(i * 100)).await.unwrap();
2611 }
2612 journal.sync().await.unwrap();
2613 drop(journal);
2614
2615 let lower_bound = 5; let upper_bound = 20; let journal = Journal::<deterministic::Context, u64>::init_sync(
2619 context.clone(),
2620 cfg.clone(),
2621 lower_bound..upper_bound,
2622 )
2623 .await
2624 .expect("Failed to initialize journal with exact match");
2625
2626 assert_eq!(journal.size().await, 20);
2627
2628 assert_eq!(journal.bounds().await.start, 5); assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2633 assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
2634
2635 assert_eq!(journal.read(5).await.unwrap(), 500);
2637 assert_eq!(journal.read(10).await.unwrap(), 1000);
2638 assert_eq!(journal.read(19).await.unwrap(), 1900);
2639
2640 assert!(matches!(
2642 journal.read(20).await,
2643 Err(Error::ItemOutOfRange(_))
2644 ));
2645
2646 let pos = journal.append(&999).await.unwrap();
2648 assert_eq!(pos, 20);
2649 assert_eq!(journal.read(20).await.unwrap(), 999);
2650
2651 journal.destroy().await.unwrap();
2652 });
2653 }
2654
2655 #[test_traced]
2658 fn test_init_sync_existing_data_exceeds_upper_bound() {
2659 let executor = deterministic::Runner::default();
2660 executor.start(|context| async move {
2661 let items_per_section = NZU64!(5);
2662 let cfg = Config {
2663 partition: "test-unexpected-data".into(),
2664 items_per_section,
2665 compression: None,
2666 codec_config: (),
2667 write_buffer: NZUsize!(1024),
2668 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2669 };
2670
2671 let journal = Journal::<deterministic::Context, u64>::init(
2673 context.with_label("initial"),
2674 cfg.clone(),
2675 )
2676 .await
2677 .expect("Failed to create initial journal");
2678
2679 for i in 0..30u64 {
2681 journal.append(&(i * 1000)).await.unwrap();
2682 }
2683 journal.sync().await.unwrap();
2684 drop(journal);
2685
2686 let lower_bound = 8; for (i, upper_bound) in (9..29).enumerate() {
2689 let result = Journal::<deterministic::Context, u64>::init_sync(
2690 context.with_label(&format!("sync_{i}")),
2691 cfg.clone(),
2692 lower_bound..upper_bound,
2693 )
2694 .await;
2695
2696 assert!(matches!(result, Err(Error::ItemOutOfRange(_))));
2698 }
2699 });
2700 }
2701
2702 #[test_traced]
2704 fn test_init_sync_existing_data_stale() {
2705 let executor = deterministic::Runner::default();
2706 executor.start(|context| async move {
2707 let items_per_section = NZU64!(5);
2708 let cfg = Config {
2709 partition: "test-stale".into(),
2710 items_per_section,
2711 compression: None,
2712 codec_config: (),
2713 write_buffer: NZUsize!(1024),
2714 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2715 };
2716
2717 let journal = Journal::<deterministic::Context, u64>::init(
2719 context.with_label("first"),
2720 cfg.clone(),
2721 )
2722 .await
2723 .expect("Failed to create initial journal");
2724
2725 for i in 0..10u64 {
2727 journal.append(&(i * 100)).await.unwrap();
2728 }
2729 journal.sync().await.unwrap();
2730 drop(journal);
2731
2732 let lower_bound = 15; let upper_bound = 26; let journal = Journal::<deterministic::Context, u64>::init_sync(
2736 context.with_label("second"),
2737 cfg.clone(),
2738 lower_bound..upper_bound,
2739 )
2740 .await
2741 .expect("Failed to initialize journal with stale data");
2742
2743 assert_eq!(journal.size().await, 15);
2744
2745 assert!(journal.bounds().await.is_empty());
2747
2748 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2750 assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2751 assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2752
2753 journal.destroy().await.unwrap();
2754 });
2755 }
2756
2757 #[test_traced]
2759 fn test_init_sync_section_boundaries() {
2760 let executor = deterministic::Runner::default();
2761 executor.start(|context| async move {
2762 let items_per_section = NZU64!(5);
2763 let cfg = Config {
2764 partition: "test-boundaries".into(),
2765 items_per_section,
2766 compression: None,
2767 codec_config: (),
2768 write_buffer: NZUsize!(1024),
2769 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2770 };
2771
2772 let journal =
2774 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2775 .await
2776 .expect("Failed to create initial journal");
2777
2778 for i in 0..25u64 {
2780 journal.append(&(i * 100)).await.unwrap();
2781 }
2782 journal.sync().await.unwrap();
2783 drop(journal);
2784
2785 let lower_bound = 15; let upper_bound = 25; let journal = Journal::<deterministic::Context, u64>::init_sync(
2789 context.clone(),
2790 cfg.clone(),
2791 lower_bound..upper_bound,
2792 )
2793 .await
2794 .expect("Failed to initialize journal at boundaries");
2795
2796 assert_eq!(journal.size().await, 25);
2797
2798 assert_eq!(journal.bounds().await.start, 15);
2800
2801 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2803 assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
2804
2805 assert_eq!(journal.read(15).await.unwrap(), 1500);
2807 assert_eq!(journal.read(20).await.unwrap(), 2000);
2808 assert_eq!(journal.read(24).await.unwrap(), 2400);
2809
2810 assert!(matches!(
2812 journal.read(25).await,
2813 Err(Error::ItemOutOfRange(_))
2814 ));
2815
2816 let pos = journal.append(&999).await.unwrap();
2818 assert_eq!(pos, 25);
2819 assert_eq!(journal.read(25).await.unwrap(), 999);
2820
2821 journal.destroy().await.unwrap();
2822 });
2823 }
2824
2825 #[test_traced]
2827 fn test_init_sync_same_section_bounds() {
2828 let executor = deterministic::Runner::default();
2829 executor.start(|context| async move {
2830 let items_per_section = NZU64!(5);
2831 let cfg = Config {
2832 partition: "test-same-section".into(),
2833 items_per_section,
2834 compression: None,
2835 codec_config: (),
2836 write_buffer: NZUsize!(1024),
2837 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2838 };
2839
2840 let journal =
2842 Journal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
2843 .await
2844 .expect("Failed to create initial journal");
2845
2846 for i in 0..15u64 {
2848 journal.append(&(i * 100)).await.unwrap();
2849 }
2850 journal.sync().await.unwrap();
2851 drop(journal);
2852
2853 let lower_bound = 10; let upper_bound = 15; let journal = Journal::<deterministic::Context, u64>::init_sync(
2857 context.clone(),
2858 cfg.clone(),
2859 lower_bound..upper_bound,
2860 )
2861 .await
2862 .expect("Failed to initialize journal with same-section bounds");
2863
2864 assert_eq!(journal.size().await, 15);
2865
2866 assert_eq!(journal.bounds().await.start, 10);
2869
2870 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2872 assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
2873
2874 assert_eq!(journal.read(10).await.unwrap(), 1000);
2876 assert_eq!(journal.read(11).await.unwrap(), 1100);
2877 assert_eq!(journal.read(14).await.unwrap(), 1400);
2878
2879 assert!(matches!(
2881 journal.read(15).await,
2882 Err(Error::ItemOutOfRange(_))
2883 ));
2884
2885 let pos = journal.append(&999).await.unwrap();
2887 assert_eq!(pos, 15);
2888 assert_eq!(journal.read(15).await.unwrap(), 999);
2889
2890 journal.destroy().await.unwrap();
2891 });
2892 }
2893
2894 #[test_traced]
2899 fn test_single_item_per_section() {
2900 let executor = deterministic::Runner::default();
2901 executor.start(|context| async move {
2902 let cfg = Config {
2903 partition: "single-item-per-section".into(),
2904 items_per_section: NZU64!(1),
2905 compression: None,
2906 codec_config: (),
2907 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2908 write_buffer: NZUsize!(1024),
2909 };
2910
2911 let journal = Journal::<_, u64>::init(context.with_label("first"), cfg.clone())
2913 .await
2914 .unwrap();
2915
2916 let bounds = journal.bounds().await;
2918 assert_eq!(bounds.end, 0);
2919 assert!(bounds.is_empty());
2920
2921 let pos = journal.append(&0).await.unwrap();
2923 assert_eq!(pos, 0);
2924 assert_eq!(journal.size().await, 1);
2925
2926 journal.sync().await.unwrap();
2928
2929 let value = journal.read(journal.size().await - 1).await.unwrap();
2931 assert_eq!(value, 0);
2932
2933 for i in 1..10u64 {
2935 let pos = journal.append(&(i * 100)).await.unwrap();
2936 assert_eq!(pos, i);
2937 assert_eq!(journal.size().await, i + 1);
2938
2939 let value = journal.read(journal.size().await - 1).await.unwrap();
2941 assert_eq!(value, i * 100);
2942 }
2943
2944 for i in 0..10u64 {
2946 assert_eq!(journal.read(i).await.unwrap(), i * 100);
2947 }
2948
2949 journal.sync().await.unwrap();
2950
2951 let pruned = journal.prune(5).await.unwrap();
2954 assert!(pruned);
2955
2956 assert_eq!(journal.size().await, 10);
2958
2959 assert_eq!(journal.bounds().await.start, 5);
2961
2962 let value = journal.read(journal.size().await - 1).await.unwrap();
2964 assert_eq!(value, 900);
2965
2966 for i in 0..5 {
2968 assert!(matches!(
2969 journal.read(i).await,
2970 Err(crate::journal::Error::ItemPruned(_))
2971 ));
2972 }
2973
2974 for i in 5..10u64 {
2976 assert_eq!(journal.read(i).await.unwrap(), i * 100);
2977 }
2978
2979 for i in 10..15u64 {
2981 let pos = journal.append(&(i * 100)).await.unwrap();
2982 assert_eq!(pos, i);
2983
2984 let value = journal.read(journal.size().await - 1).await.unwrap();
2986 assert_eq!(value, i * 100);
2987 }
2988
2989 journal.sync().await.unwrap();
2990 drop(journal);
2991
2992 let journal = Journal::<_, u64>::init(context.with_label("second"), cfg.clone())
2994 .await
2995 .unwrap();
2996
2997 assert_eq!(journal.size().await, 15);
2999
3000 assert_eq!(journal.bounds().await.start, 5);
3002
3003 let value = journal.read(journal.size().await - 1).await.unwrap();
3005 assert_eq!(value, 1400);
3006
3007 for i in 5..15u64 {
3009 assert_eq!(journal.read(i).await.unwrap(), i * 100);
3010 }
3011
3012 journal.destroy().await.unwrap();
3013
3014 let journal = Journal::<_, u64>::init(context.with_label("third"), cfg.clone())
3017 .await
3018 .unwrap();
3019
3020 for i in 0..10u64 {
3022 journal.append(&(i * 1000)).await.unwrap();
3023 }
3024
3025 journal.prune(5).await.unwrap();
3027 let bounds = journal.bounds().await;
3028 assert_eq!(bounds.end, 10);
3029 assert_eq!(bounds.start, 5);
3030
3031 journal.sync().await.unwrap();
3033 drop(journal);
3034
3035 let journal = Journal::<_, u64>::init(context.with_label("fourth"), cfg.clone())
3037 .await
3038 .unwrap();
3039
3040 let bounds = journal.bounds().await;
3042 assert_eq!(bounds.end, 10);
3043 assert_eq!(bounds.start, 5);
3044
3045 let value = journal.read(journal.size().await - 1).await.unwrap();
3047 assert_eq!(value, 9000);
3048
3049 for i in 5..10u64 {
3051 assert_eq!(journal.read(i).await.unwrap(), i * 1000);
3052 }
3053
3054 journal.destroy().await.unwrap();
3055
3056 let journal = Journal::<_, u64>::init(context.with_label("fifth"), cfg.clone())
3060 .await
3061 .unwrap();
3062
3063 for i in 0..5u64 {
3064 journal.append(&(i * 100)).await.unwrap();
3065 }
3066 journal.sync().await.unwrap();
3067
3068 journal.prune(5).await.unwrap();
3070 let bounds = journal.bounds().await;
3071 assert_eq!(bounds.end, 5); assert!(bounds.is_empty()); let result = journal.read(journal.size().await - 1).await;
3076 assert!(matches!(result, Err(crate::journal::Error::ItemPruned(4))));
3077
3078 journal.append(&500).await.unwrap();
3080 let bounds = journal.bounds().await;
3081 assert_eq!(bounds.start, 5);
3082 assert_eq!(journal.read(bounds.end - 1).await.unwrap(), 500);
3083
3084 journal.destroy().await.unwrap();
3085 });
3086 }
3087
3088 #[test_traced]
3089 fn test_variable_journal_clear_to_size() {
3090 let executor = deterministic::Runner::default();
3091 executor.start(|context| async move {
3092 let cfg = Config {
3093 partition: "clear-test".into(),
3094 items_per_section: NZU64!(10),
3095 compression: None,
3096 codec_config: (),
3097 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
3098 write_buffer: NZUsize!(1024),
3099 };
3100
3101 let journal = Journal::<_, u64>::init(context.with_label("journal"), cfg.clone())
3102 .await
3103 .unwrap();
3104
3105 for i in 0..25u64 {
3107 journal.append(&(i * 100)).await.unwrap();
3108 }
3109 let bounds = journal.bounds().await;
3110 assert_eq!(bounds.end, 25);
3111 assert_eq!(bounds.start, 0);
3112 journal.sync().await.unwrap();
3113
3114 journal.clear_to_size(100).await.unwrap();
3116 let bounds = journal.bounds().await;
3117 assert_eq!(bounds.end, 100);
3118 assert!(bounds.is_empty());
3119
3120 for i in 0..25 {
3122 assert!(matches!(
3123 journal.read(i).await,
3124 Err(crate::journal::Error::ItemPruned(_))
3125 ));
3126 }
3127
3128 drop(journal);
3130 let journal =
3131 Journal::<_, u64>::init(context.with_label("journal_after_clear"), cfg.clone())
3132 .await
3133 .unwrap();
3134 let bounds = journal.bounds().await;
3135 assert_eq!(bounds.end, 100);
3136 assert!(bounds.is_empty());
3137
3138 for i in 100..105u64 {
3140 let pos = journal.append(&(i * 100)).await.unwrap();
3141 assert_eq!(pos, i);
3142 }
3143 let bounds = journal.bounds().await;
3144 assert_eq!(bounds.end, 105);
3145 assert_eq!(bounds.start, 100);
3146
3147 for i in 100..105u64 {
3149 assert_eq!(journal.read(i).await.unwrap(), i * 100);
3150 }
3151
3152 journal.sync().await.unwrap();
3154 drop(journal);
3155
3156 let journal = Journal::<_, u64>::init(context.with_label("journal_reopened"), cfg)
3157 .await
3158 .unwrap();
3159
3160 let bounds = journal.bounds().await;
3161 assert_eq!(bounds.end, 105);
3162 assert_eq!(bounds.start, 100);
3163 for i in 100..105u64 {
3164 assert_eq!(journal.read(i).await.unwrap(), i * 100);
3165 }
3166
3167 journal.destroy().await.unwrap();
3168 });
3169 }
3170}