1use super::Reader as _;
7use crate::{
8 journal::{
9 contiguous::{fixed, metrics::VariableMetrics as Metrics, Contiguous, Many, Mutable},
10 segmented::variable,
11 Error,
12 },
13 Context, Persistable,
14};
15use commonware_codec::{Codec, CodecShared};
16use commonware_runtime::buffer::paged::CacheRef;
17use commonware_utils::{
18 sync::{AsyncRwLockReadGuard, UpgradableAsyncRwLock},
19 NZUsize,
20};
21#[commonware_macros::stability(ALPHA)]
22use core::ops::Range;
23use futures::{stream, Stream, StreamExt as _};
24use std::num::{NonZeroU64, NonZeroUsize};
25#[commonware_macros::stability(ALPHA)]
26use tracing::debug;
27use tracing::warn;
28
29const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1024);
30
31const DATA_SUFFIX: &str = "_data";
33
34const OFFSETS_SUFFIX: &str = "_offsets";
36
37const fn position_to_section(position: u64, items_per_section: u64) -> u64 {
59 position / items_per_section
60}
61
62#[derive(Clone)]
64pub struct Config<C> {
65 pub partition: String,
67
68 pub items_per_section: NonZeroU64,
73
74 pub compression: Option<u8>,
76
77 pub codec_config: C,
79
80 pub page_cache: CacheRef,
82
83 pub write_buffer: NonZeroUsize,
85}
86
87impl<C> Config<C> {
88 fn data_partition(&self) -> String {
90 format!("{}{}", self.partition, DATA_SUFFIX)
91 }
92
93 fn offsets_partition(&self) -> String {
95 format!("{}{}", self.partition, OFFSETS_SUFFIX)
96 }
97}
98
99struct Inner<E: Context, V: Codec> {
101 data: variable::Journal<E, V>,
103
104 size: u64,
110
111 pruning_boundary: u64,
120}
121
122impl<E: Context, V: CodecShared> Inner<E, V> {
123 async fn read(
131 &self,
132 position: u64,
133 items_per_section: u64,
134 offsets: &impl super::Reader<Item = u64>,
135 ) -> Result<V, Error> {
136 if position >= self.size {
137 return Err(Error::ItemOutOfRange(position));
138 }
139 if position < self.pruning_boundary {
140 return Err(Error::ItemPruned(position));
141 }
142
143 let offset = offsets.read(position).await?;
144 let section = position_to_section(position, items_per_section);
145
146 self.data.get(section, offset).await
147 }
148
149 fn try_read_sync(
151 &self,
152 position: u64,
153 items_per_section: u64,
154 offsets: &impl super::Reader<Item = u64>,
155 ) -> Option<V> {
156 let mut buf = Vec::new();
157 self.try_read_sync_into(position, items_per_section, offsets, &mut buf)
158 }
159
160 fn try_read_sync_into(
162 &self,
163 position: u64,
164 items_per_section: u64,
165 offsets: &impl super::Reader<Item = u64>,
166 buf: &mut Vec<u8>,
167 ) -> Option<V> {
168 if position >= self.size || position < self.pruning_boundary {
169 return None;
170 }
171 let offset = offsets.try_read_sync(position)?;
172 let section = position_to_section(position, items_per_section);
173 self.data.try_get_sync_into(section, offset, buf)
174 }
175}
176
177pub struct Journal<E: Context, V: Codec> {
215 inner: UpgradableAsyncRwLock<Inner<E, V>>,
220
221 offsets: fixed::Journal<E, u64>,
224
225 items_per_section: u64,
232
233 compression: Option<u8>,
235
236 metrics: Metrics<E>,
238}
239
240pub struct Reader<'a, E: Context, V: Codec> {
242 guard: AsyncRwLockReadGuard<'a, Inner<E, V>>,
243 offsets: fixed::Reader<'a, E, u64>,
244 items_per_section: u64,
245 metrics: &'a Metrics<E>,
246}
247
248impl<E: Context, V: CodecShared> super::Reader for Reader<'_, E, V> {
249 type Item = V;
250
251 fn bounds(&self) -> std::ops::Range<u64> {
252 self.guard.pruning_boundary..self.guard.size
253 }
254
255 async fn read(&self, position: u64) -> Result<V, Error> {
256 let _timer = self.metrics.read_timer();
257 self.metrics.read_calls.inc();
258 let result = match self
259 .guard
260 .read(position, self.items_per_section, &self.offsets)
261 .await
262 {
263 Ok(item) => {
264 self.metrics.items_read.inc();
265 Ok(item)
266 }
267 Err(error) => Err(error),
268 };
269 result
270 }
271
272 async fn read_many(&self, positions: &[u64]) -> Result<Vec<V>, Error> {
273 if positions.is_empty() {
274 return Ok(Vec::new());
275 }
276 let _timer = self.metrics.read_many_timer();
277 self.metrics.read_many_calls.inc();
278 assert!(
279 positions.windows(2).all(|w| w[0] < w[1]),
280 "positions must be strictly increasing"
281 );
282 if positions[0] < self.guard.pruning_boundary {
283 return Err(Error::ItemPruned(positions[0]));
284 }
285 let last_position = *positions.last().expect("positions is not empty");
286 if last_position >= self.guard.size {
287 return Err(Error::ItemOutOfRange(last_position));
288 }
289
290 let mut result: Vec<Option<V>> = Vec::with_capacity(positions.len());
292 let mut miss_indices = Vec::with_capacity(positions.len());
293 let mut miss_positions = Vec::with_capacity(positions.len());
294 let mut buf = Vec::new();
295 for (i, &position) in positions.iter().enumerate() {
296 if let Some(item) = self.guard.try_read_sync_into(
297 position,
298 self.items_per_section,
299 &self.offsets,
300 &mut buf,
301 ) {
302 result.push(Some(item));
303 } else {
304 result.push(None);
305 miss_indices.push(i);
306 miss_positions.push(position);
307 }
308 }
309
310 if miss_positions.is_empty() {
311 self.metrics.items_read.inc_by(positions.len() as u64);
312 return Ok(result.into_iter().map(|r| r.unwrap()).collect());
313 }
314
315 let miss_offsets = self
317 .offsets
318 .read_many(&miss_positions)
319 .await
320 .map_err(|e| match e {
321 Error::ItemOutOfRange(e) | Error::ItemPruned(e) => {
322 Error::Corruption(format!("section/item should be found, but got: {e}"))
323 }
324 other => other,
325 })?;
326
327 let mut group_start = 0;
330 while group_start < miss_positions.len() {
331 let section = position_to_section(miss_positions[group_start], self.items_per_section);
332 let mut group_end = group_start + 1;
333 while group_end < miss_positions.len()
334 && position_to_section(miss_positions[group_end], self.items_per_section) == section
335 {
336 group_end += 1;
337 }
338
339 let mut run_start = group_start;
340 while run_start < group_end {
341 let mut run_end = run_start + 1;
342 while run_end < group_end
343 && miss_positions[run_end - 1].checked_add(1) == Some(miss_positions[run_end])
344 {
345 run_end += 1;
346 }
347
348 let items = self
349 .guard
350 .data
351 .get_many_consecutive(section, &miss_offsets[run_start..run_end])
352 .await?;
353
354 for (item, &miss_idx) in items.into_iter().zip(&miss_indices[run_start..run_end]) {
355 result[miss_idx] = Some(item);
356 }
357 run_start = run_end;
358 }
359 group_start = group_end;
360 }
361
362 self.metrics.items_read.inc_by(positions.len() as u64);
363 Ok(result.into_iter().map(|r| r.unwrap()).collect())
364 }
365
366 fn try_read_sync(&self, position: u64) -> Option<V> {
367 let item = self
368 .guard
369 .try_read_sync(position, self.items_per_section, &self.offsets)?;
370 self.metrics.try_read_sync_hits.inc();
371 self.metrics.items_read.inc();
372 Some(item)
373 }
374
375 async fn replay(
376 &self,
377 buffer_size: NonZeroUsize,
378 start_pos: u64,
379 ) -> Result<impl Stream<Item = Result<(u64, V), Error>> + Send, Error> {
380 if start_pos < self.guard.pruning_boundary {
382 return Err(Error::ItemPruned(start_pos));
383 }
384 if start_pos > self.guard.size {
385 return Err(Error::ItemOutOfRange(start_pos));
386 }
387
388 let (start_section, start_offset) = if start_pos < self.guard.size {
391 let offset = self.offsets.read(start_pos).await?;
392 let section = position_to_section(start_pos, self.items_per_section);
393 (section, offset)
394 } else {
395 (u64::MAX, 0)
396 };
397
398 let inner_stream = self
399 .guard
400 .data
401 .replay(start_section, start_offset, buffer_size)
402 .await?;
403
404 let stream = inner_stream
406 .zip(stream::iter(start_pos..))
407 .map(|(result, pos)| result.map(|(_section, _offset, _size, item)| (pos, item)));
408
409 Ok(stream)
410 }
411}
412
413impl<E: Context, V: CodecShared> Journal<E, V> {
414 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
421 let items_per_section = cfg.items_per_section.get();
422 let data_partition = cfg.data_partition();
423 let offsets_partition = cfg.offsets_partition();
424
425 let mut data = variable::Journal::init(
427 context.child("data"),
428 variable::Config {
429 partition: data_partition,
430 compression: cfg.compression,
431 codec_config: cfg.codec_config,
432 page_cache: cfg.page_cache.clone(),
433 write_buffer: cfg.write_buffer,
434 },
435 )
436 .await?;
437
438 let mut offsets = fixed::Journal::init(
440 context.child("offsets"),
441 fixed::Config {
442 partition: offsets_partition,
443 items_per_blob: cfg.items_per_section,
444 page_cache: cfg.page_cache,
445 write_buffer: cfg.write_buffer,
446 },
447 )
448 .await?;
449
450 let (pruning_boundary, size) =
452 Self::align_journals(&mut data, &mut offsets, items_per_section).await?;
453
454 let metrics = Metrics::new(context);
455 metrics.update(size, pruning_boundary, items_per_section);
456
457 Ok(Self {
458 inner: UpgradableAsyncRwLock::new(Inner {
459 data,
460 size,
461 pruning_boundary,
462 }),
463 offsets,
464 items_per_section,
465 compression: cfg.compression,
466 metrics,
467 })
468 }
469
470 #[commonware_macros::stability(ALPHA)]
475 pub async fn init_at_size(context: E, cfg: Config<V::Cfg>, size: u64) -> Result<Self, Error> {
476 let data = variable::Journal::init(
478 context.child("data"),
479 variable::Config {
480 partition: cfg.data_partition(),
481 compression: cfg.compression,
482 codec_config: cfg.codec_config.clone(),
483 page_cache: cfg.page_cache.clone(),
484 write_buffer: cfg.write_buffer,
485 },
486 )
487 .await?;
488
489 let offsets = fixed::Journal::init_at_size(
491 context.child("offsets"),
492 fixed::Config {
493 partition: cfg.offsets_partition(),
494 items_per_blob: cfg.items_per_section,
495 page_cache: cfg.page_cache,
496 write_buffer: cfg.write_buffer,
497 },
498 size,
499 )
500 .await?;
501
502 let items_per_section = cfg.items_per_section.get();
503 let metrics = Metrics::new(context);
504 metrics.update(size, size, items_per_section);
505
506 Ok(Self {
507 inner: UpgradableAsyncRwLock::new(Inner {
508 data,
509 size,
510 pruning_boundary: size,
511 }),
512 offsets,
513 items_per_section,
514 compression: cfg.compression,
515 metrics,
516 })
517 }
518
519 #[commonware_macros::stability(ALPHA)]
545 pub(crate) async fn init_sync(
546 context: E,
547 cfg: Config<V::Cfg>,
548 range: Range<u64>,
549 ) -> Result<Self, Error> {
550 assert!(!range.is_empty(), "range must not be empty");
551
552 debug!(
553 range.start,
554 range.end,
555 items_per_section = cfg.items_per_section.get(),
556 "initializing contiguous variable journal for sync"
557 );
558
559 let journal = Self::init(context.child("journal"), cfg.clone()).await?;
561
562 let size = journal.size().await;
563
564 if size == 0 {
566 if range.start == 0 {
567 debug!("no existing journal data, returning empty journal");
568 return Ok(journal);
569 } else {
570 debug!(
571 range.start,
572 "no existing journal data, initializing at sync range start"
573 );
574 journal.destroy().await?;
575 return Self::init_at_size(context, cfg, range.start).await;
576 }
577 }
578
579 let bounds = journal.reader().await.bounds();
582 if bounds.is_empty() && bounds.start > range.start {
583 journal.clear_to_size(range.start).await?;
584 return Ok(journal);
585 }
586
587 if size > range.end {
589 return Err(Error::ItemOutOfRange(size));
590 }
591
592 if size <= range.start {
594 debug!(
596 size,
597 range.start, "existing journal data is stale, re-initializing at start position"
598 );
599 journal.destroy().await?;
600 return Self::init_at_size(context, cfg, range.start).await;
601 }
602
603 if !bounds.is_empty() && bounds.start < range.start {
605 debug!(
606 oldest_pos = bounds.start,
607 range.start, "pruning journal to sync range start"
608 );
609 journal.prune(range.start).await?;
610 }
611
612 Ok(journal)
613 }
614
615 pub async fn rewind(&self, size: u64) -> Result<(), Error> {
629 let mut inner = self.inner.write().await;
630
631 match size.cmp(&inner.size) {
633 std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
634 std::cmp::Ordering::Equal => return Ok(()), std::cmp::Ordering::Less => {}
636 }
637
638 if size < inner.pruning_boundary {
640 return Err(Error::ItemPruned(size));
641 }
642
643 let discard_offset = {
645 let offsets_reader = self.offsets.reader().await;
646 offsets_reader.read(size).await?
647 };
648 let discard_section = position_to_section(size, self.items_per_section);
649
650 inner
651 .data
652 .rewind_to_offset(discard_section, discard_offset)
653 .await?;
654 self.offsets.rewind(size).await?;
655
656 inner.size = size;
658 self.metrics
659 .update(inner.size, inner.pruning_boundary, self.items_per_section);
660
661 Ok(())
662 }
663
664 pub async fn append(&self, item: &V) -> Result<u64, Error> {
680 let _timer = self.metrics.append_timer();
681 self.metrics.append_calls.inc();
682 self.append_many_inner(Many::Flat(std::slice::from_ref(item)))
683 .await
684 }
685
686 pub async fn append_many<'a>(&'a self, items: Many<'a, V>) -> Result<u64, Error> {
691 let _timer = self.metrics.append_many_timer();
692 self.metrics.append_many_calls.inc();
693 self.append_many_inner(items).await
694 }
695
696 async fn append_many_inner<'a>(&'a self, items: Many<'a, V>) -> Result<u64, Error> {
697 if items.is_empty() {
698 return Err(Error::EmptyAppend);
699 }
700 let items_count = items.len();
701
702 let mut encoded = Vec::new();
704 let mut item_starts = Vec::with_capacity(items_count);
705 let mut encode = |item: &V| {
706 item_starts.push(encoded.len());
707 variable::Journal::<E, V>::encode_item_into(self.compression, item, &mut encoded)
708 };
709 match &items {
710 Many::Flat(items) => {
711 for item in *items {
712 encode(item)?;
713 }
714 }
715 Many::Nested(nested_items) => {
716 for items in *nested_items {
717 for item in *items {
718 encode(item)?;
719 }
720 }
721 }
722 }
723
724 let mut inner = self.inner.write().await;
726
727 let mut written = 0;
728 while written < items_count {
729 let section = position_to_section(inner.size, self.items_per_section);
730 let pos_in_section = inner.size % self.items_per_section;
731 let remaining_space = (self.items_per_section - pos_in_section) as usize;
732 let batch_count = remaining_space.min(items_count - written);
733 let batch_start = item_starts[written];
734 let batch_end = item_starts
735 .get(written + batch_count)
736 .copied()
737 .unwrap_or(encoded.len());
738
739 let base_offset = inner
742 .data
743 .append_raw(section, &encoded[batch_start..batch_end])
744 .await?;
745
746 let absolute_offsets = item_starts[written..written + batch_count]
747 .iter()
748 .map(|&start| {
749 base_offset
750 .checked_add((start - batch_start) as u64)
751 .ok_or(Error::OffsetOverflow)
752 })
753 .collect::<Result<Vec<u64>, _>>()?;
754
755 let last_offsets_pos = self
757 .offsets
758 .append_many(Many::Flat(&absolute_offsets))
759 .await?;
760 assert_eq!(last_offsets_pos, inner.size + batch_count as u64 - 1);
761
762 inner.size += batch_count as u64;
763 written += batch_count;
764
765 if inner.size.is_multiple_of(self.items_per_section) {
768 let inner_ref = inner.downgrade_to_upgradable();
769 futures::try_join!(inner_ref.data.sync(section), self.offsets.sync())?;
770 if written == items_count {
771 self.metrics.update(
772 inner_ref.size,
773 inner_ref.pruning_boundary,
774 self.items_per_section,
775 );
776 return Ok(inner_ref.size - 1);
777 }
778 inner = inner_ref.upgrade().await;
779 }
780 }
781
782 self.metrics
783 .update(inner.size, inner.pruning_boundary, self.items_per_section);
784 Ok(inner.size - 1)
785 }
786
787 pub async fn reader(&self) -> Reader<'_, E, V> {
789 Reader {
790 guard: self.inner.read().await,
791 offsets: self.offsets.reader().await,
792 items_per_section: self.items_per_section,
793 metrics: &self.metrics,
794 }
795 }
796
797 pub async fn size(&self) -> u64 {
800 self.inner.read().await.size
801 }
802
803 pub async fn prune(&self, min_position: u64) -> Result<bool, Error> {
814 let mut inner = self.inner.write().await;
815
816 if min_position <= inner.pruning_boundary {
817 return Ok(false);
818 }
819
820 let min_position = min_position.min(inner.size);
822
823 let min_section = position_to_section(min_position, self.items_per_section);
825
826 let pruned = inner.data.prune(min_section).await?;
827 if pruned {
828 let new_oldest = (min_section * self.items_per_section).max(inner.pruning_boundary);
829 inner.pruning_boundary = new_oldest;
830 self.offsets.prune(new_oldest).await?;
831 self.metrics
832 .update(inner.size, inner.pruning_boundary, self.items_per_section);
833 }
834 Ok(pruned)
835 }
836
837 pub async fn commit(&self) -> Result<(), Error> {
842 let _timer = self.metrics.commit_timer();
843 self.metrics.record_commit();
844 let inner = self.inner.upgradable_read().await;
847
848 let section = position_to_section(inner.size, self.items_per_section);
849 inner.data.sync(section).await?;
850 Ok(())
851 }
852
853 pub async fn sync(&self) -> Result<(), Error> {
857 let _timer = self.metrics.sync_timer();
858 self.metrics.sync_calls.inc();
859 let inner = self.inner.upgradable_read().await;
862
863 let section = position_to_section(inner.size, self.items_per_section);
866
867 futures::try_join!(inner.data.sync(section), self.offsets.sync())?;
870
871 Ok(())
872 }
873
874 pub async fn destroy(self) -> Result<(), Error> {
878 let inner = self.inner.into_inner();
879 inner.data.destroy().await?;
880 self.offsets.destroy().await
881 }
882
883 #[commonware_macros::stability(ALPHA)]
888 pub(crate) async fn clear_to_size(&self, new_size: u64) -> Result<(), Error> {
889 let mut inner = self.inner.write().await;
890 inner.data.clear().await?;
891
892 self.offsets.clear_to_size(new_size).await?;
893 inner.size = new_size;
894 inner.pruning_boundary = new_size;
895 self.metrics
896 .update(inner.size, inner.pruning_boundary, self.items_per_section);
897 Ok(())
898 }
899
900 async fn align_journals(
910 data: &mut variable::Journal<E, V>,
911 offsets: &mut fixed::Journal<E, u64>,
912 items_per_section: u64,
913 ) -> Result<(u64, u64), Error> {
914 let items_in_last_section = match data.newest_section() {
916 Some(last_section) => {
917 let stream = data.replay(last_section, 0, REPLAY_BUFFER_SIZE).await?;
918 futures::pin_mut!(stream);
919 let mut count = 0u64;
920 while let Some(result) = stream.next().await {
921 result?; count += 1;
923 }
924 count
925 }
926 None => 0,
927 };
928
929 let data_empty =
933 data.is_empty() || (data.num_sections() == 1 && items_in_last_section == 0);
934 if data_empty {
935 let offsets_bounds = {
936 let offsets_reader = offsets.reader().await;
937 offsets_reader.bounds()
938 };
939 let size = offsets_bounds.end;
940
941 if !data.is_empty() {
942 let data_first_section = data.oldest_section().unwrap();
948 let data_section_start = data_first_section * items_per_section;
949 let target_pos = data_section_start.max(offsets_bounds.start);
950
951 warn!("crash repair: clearing offsets to {target_pos} (empty section crash)");
952 offsets.clear_to_size(target_pos).await?;
953 return Ok((target_pos, target_pos));
954 }
955
956 if !offsets_bounds.is_empty() && offsets_bounds.start < size {
961 warn!("crash repair: clearing offsets to {size} (prune-all crash)");
965 offsets.clear_to_size(size).await?;
966 }
967
968 return Ok((size, size));
969 }
970
971 let data_first_section = data.oldest_section().unwrap();
973 let data_last_section = data.newest_section().unwrap();
974
975 let data_oldest_pos = data_first_section * items_per_section;
978
979 {
982 let offsets_bounds = {
983 let offsets_reader = offsets.reader().await;
984 offsets_reader.bounds()
985 };
986 match (
987 offsets_bounds.is_empty(),
988 offsets_bounds.start.cmp(&data_oldest_pos),
989 ) {
990 (true, _) => {
991 let offsets_first_section = offsets_bounds.start / items_per_section;
994 if offsets_first_section != data_first_section {
995 return Err(Error::Corruption(format!(
996 "offsets journal empty at section {offsets_first_section} != data section {data_first_section}"
997 )));
998 }
999 warn!(
1000 "crash repair: offsets journal empty at {}, will rebuild from data",
1001 offsets_bounds.start
1002 );
1003 }
1004 (false, std::cmp::Ordering::Less) => {
1005 warn!("crash repair: pruning offsets journal to {data_oldest_pos}");
1007 offsets.prune(data_oldest_pos).await?;
1008 }
1009 (false, std::cmp::Ordering::Greater) => {
1010 if offsets_bounds.start / items_per_section > data_first_section {
1012 return Err(Error::Corruption(format!(
1013 "offsets oldest pos ({}) > data oldest pos ({data_oldest_pos})",
1014 offsets_bounds.start
1015 )));
1016 }
1017 }
1018 (false, std::cmp::Ordering::Equal) => {
1019 }
1021 }
1022 }
1023
1024 let (offsets_bounds, data_size) = {
1032 let offsets_reader = offsets.reader().await;
1033 let offsets_bounds = offsets_reader.bounds();
1034 let data_size = if data_first_section == data_last_section {
1035 offsets_bounds.start + items_in_last_section
1036 } else {
1037 let oldest_items =
1038 (data_first_section + 1) * items_per_section - offsets_bounds.start;
1039 let middle_items = (data_last_section - data_first_section - 1) * items_per_section;
1040 offsets_bounds.start + oldest_items + middle_items + items_in_last_section
1041 };
1042 (offsets_bounds, data_size)
1043 };
1044
1045 let offsets_size = offsets_bounds.end;
1047 if offsets_size > data_size {
1048 warn!("crash repair: rewinding offsets from {offsets_size} to {data_size}");
1050 offsets.rewind(data_size).await?;
1051 } else if offsets_size < data_size {
1052 Self::add_missing_offsets(data, offsets, offsets_size, items_per_section).await?;
1054 }
1055
1056 let pruning_boundary = {
1058 let offsets_reader = offsets.reader().await;
1059 let offsets_bounds = offsets_reader.bounds();
1060 assert_eq!(offsets_bounds.end, data_size);
1061
1062 assert!(
1065 !offsets_bounds.is_empty(),
1066 "offsets should have data after alignment"
1067 );
1068 assert_eq!(
1069 offsets_bounds.start / items_per_section,
1070 data_first_section,
1071 "offsets and data should be in same oldest section"
1072 );
1073 offsets_bounds.start
1074 };
1075
1076 offsets.sync().await?;
1077 Ok((pruning_boundary, data_size))
1078 }
1079
1080 async fn add_missing_offsets(
1091 data: &variable::Journal<E, V>,
1092 offsets: &mut fixed::Journal<E, u64>,
1093 offsets_size: u64,
1094 items_per_section: u64,
1095 ) -> Result<(), Error> {
1096 assert!(
1097 !data.is_empty(),
1098 "rebuild_offsets called with empty data journal"
1099 );
1100
1101 let (start_section, resume_offset, skip_first) = {
1103 let offsets_reader = offsets.reader().await;
1104 let offsets_bounds = offsets_reader.bounds();
1105 if offsets_bounds.is_empty() {
1106 let first_section = data.oldest_section().unwrap();
1109 (first_section, 0, false)
1110 } else if offsets_bounds.start < offsets_size {
1111 let last_offset = offsets_reader.read(offsets_size - 1).await?;
1113 let last_section = position_to_section(offsets_size - 1, items_per_section);
1114 (last_section, last_offset, true)
1115 } else {
1116 let first_section = data.oldest_section().unwrap();
1119 (first_section, 0, false)
1120 }
1121 };
1122
1123 let stream = data
1127 .replay(start_section, resume_offset, REPLAY_BUFFER_SIZE)
1128 .await?;
1129 futures::pin_mut!(stream);
1130
1131 let mut skipped_first = false;
1132 while let Some(result) = stream.next().await {
1133 let (_section, offset, _size, _item) = result?;
1134
1135 if skip_first && !skipped_first {
1137 skipped_first = true;
1138 continue;
1139 }
1140
1141 offsets.append(&offset).await?;
1142 }
1143
1144 Ok(())
1145 }
1146}
1147
1148impl<E: Context, V: CodecShared> Contiguous for Journal<E, V> {
1150 type Item = V;
1151
1152 async fn reader(&self) -> impl super::Reader<Item = V> + '_ {
1153 Self::reader(self).await
1154 }
1155
1156 async fn size(&self) -> u64 {
1157 Self::size(self).await
1158 }
1159}
1160
1161impl<E: Context, V: CodecShared> Mutable for Journal<E, V> {
1162 async fn append(&mut self, item: &Self::Item) -> Result<u64, Error> {
1163 Self::append(self, item).await
1164 }
1165
1166 async fn append_many<'a>(&'a mut self, items: Many<'a, Self::Item>) -> Result<u64, Error> {
1167 Self::append_many(self, items).await
1168 }
1169
1170 async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
1171 Self::prune(self, min_position).await
1172 }
1173
1174 async fn rewind(&mut self, size: u64) -> Result<(), Error> {
1175 Self::rewind(self, size).await
1176 }
1177}
1178
1179impl<E: Context, V: CodecShared> Persistable for Journal<E, V> {
1180 type Error = Error;
1181
1182 async fn commit(&self) -> Result<(), Error> {
1183 self.commit().await
1184 }
1185
1186 async fn sync(&self) -> Result<(), Error> {
1187 self.sync().await
1188 }
1189
1190 async fn destroy(self) -> Result<(), Error> {
1191 self.destroy().await
1192 }
1193}
1194
1195#[commonware_macros::stability(ALPHA)]
1196impl<E: Context, V: CodecShared> crate::journal::authenticated::Inner<E> for Journal<E, V> {
1197 type Config = Config<V::Cfg>;
1198
1199 async fn init<
1200 F: crate::merkle::Family,
1201 H: commonware_cryptography::Hasher,
1202 S: commonware_parallel::Strategy,
1203 >(
1204 context: E,
1205 merkle_cfg: crate::merkle::full::Config<S>,
1206 journal_cfg: Self::Config,
1207 rewind_predicate: fn(&V) -> bool,
1208 bagging: crate::merkle::Bagging,
1209 ) -> Result<
1210 crate::journal::authenticated::Journal<F, E, Self, H, S>,
1211 crate::journal::authenticated::Error<F>,
1212 > {
1213 crate::journal::authenticated::Journal::<F, E, Self, H, S>::new(
1214 context,
1215 merkle_cfg,
1216 journal_cfg,
1217 rewind_predicate,
1218 bagging,
1219 )
1220 .await
1221 }
1222}
1223
1224#[cfg(test)]
1225impl<E: Context, V: CodecShared> Journal<E, V> {
1226 pub(crate) async fn read(&self, position: u64) -> Result<V, Error> {
1228 self.reader().await.read(position).await
1229 }
1230
1231 pub(crate) async fn bounds(&self) -> std::ops::Range<u64> {
1233 self.reader().await.bounds()
1234 }
1235
1236 pub(crate) async fn test_prune_data(&self, section: u64) -> Result<bool, Error> {
1238 let mut inner = self.inner.write().await;
1239 inner.data.prune(section).await
1240 }
1241
1242 pub(crate) async fn test_prune_offsets(&self, position: u64) -> Result<bool, Error> {
1244 self.offsets.prune(position).await
1245 }
1246
1247 pub(crate) async fn test_rewind_offsets(&self, position: u64) -> Result<(), Error> {
1249 self.offsets.rewind(position).await
1250 }
1251
1252 pub(crate) async fn test_offsets_size(&self) -> u64 {
1254 self.offsets.size().await
1255 }
1256
1257 pub(crate) async fn test_append_data(
1259 &self,
1260 section: u64,
1261 item: V,
1262 ) -> Result<(u64, u32), Error> {
1263 let mut inner = self.inner.write().await;
1264 inner.data.append(section, &item).await
1265 }
1266
1267 pub(crate) async fn test_sync_data(&self) -> Result<(), Error> {
1269 let inner = self.inner.read().await;
1270 inner
1271 .data
1272 .sync(inner.data.newest_section().unwrap_or(0))
1273 .await
1274 }
1275}
1276
1277#[cfg(test)]
1278mod tests {
1279 use super::*;
1280 use crate::journal::contiguous::tests::run_contiguous_tests;
1281 use commonware_macros::test_traced;
1282 use commonware_runtime::{
1283 buffer::paged::CacheRef, deterministic, Metrics as _, Runner, Storage, Supervisor as _,
1284 };
1285 use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
1286 use futures::FutureExt as _;
1287 use std::num::NonZeroU16;
1288
1289 const PAGE_SIZE: NonZeroU16 = NZU16!(101);
1291 const PAGE_CACHE_SIZE: usize = 2;
1292 const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(1024);
1294 const SMALL_PAGE_SIZE: NonZeroU16 = NZU16!(512);
1295
1296 #[test_traced]
1297 fn test_variable_append_many_compressed() {
1298 let executor = deterministic::Runner::default();
1299 executor.start(|context| async move {
1300 let cfg = Config {
1301 partition: "append-many-compressed".into(),
1302 items_per_section: NZU64!(3),
1303 compression: Some(1),
1304 codec_config: (),
1305 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1306 write_buffer: NZUsize!(1024),
1307 };
1308 let journal = Journal::<_, FixedBytes<32>>::init(context.child("journal"), cfg)
1309 .await
1310 .unwrap();
1311 let items = [
1312 FixedBytes::new([0; 32]),
1313 FixedBytes::new([1; 32]),
1314 FixedBytes::new([2; 32]),
1315 FixedBytes::new([3; 32]),
1316 FixedBytes::new([4; 32]),
1317 ];
1318
1319 let last = journal.append_many(Many::Flat(&items)).await.unwrap();
1320 assert_eq!(last, 4);
1321 for (pos, item) in items.iter().enumerate() {
1322 assert_eq!(journal.read(pos as u64).await.unwrap(), *item);
1323 }
1324
1325 journal.destroy().await.unwrap();
1326 });
1327 }
1328
1329 #[test_traced]
1330 fn test_variable_read_many_after_reopen() {
1331 let executor = deterministic::Runner::default();
1332 executor.start(|context| async move {
1333 let cfg = Config {
1334 partition: "read-many-after-reopen".into(),
1335 items_per_section: NZU64!(5),
1336 compression: None,
1337 codec_config: (),
1338 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1339 write_buffer: NZUsize!(1024),
1340 };
1341
1342 let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
1343 .await
1344 .unwrap();
1345 for i in 0..20u64 {
1346 journal.append(&(i * 100)).await.unwrap();
1347 }
1348 journal.sync().await.unwrap();
1349 drop(journal);
1350
1351 let cfg = Config {
1352 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1353 ..cfg
1354 };
1355 let journal = Journal::<_, u64>::init(context.child("second"), cfg)
1356 .await
1357 .unwrap();
1358 let reader = journal.reader().await;
1359 let items = reader.read_many(&[1, 2, 3, 7, 8, 12, 18]).await.unwrap();
1360 assert_eq!(items, vec![100, 200, 300, 700, 800, 1200, 1800]);
1361 drop(reader);
1362
1363 journal.destroy().await.unwrap();
1364 });
1365 }
1366
1367 #[test_traced]
1368 fn test_variable_read_many_consecutive_after_reopen() {
1369 let executor = deterministic::Runner::default();
1370 executor.start(|context| async move {
1371 let cfg = Config {
1372 partition: "read-many-consecutive-after-reopen".into(),
1373 items_per_section: NZU64!(20),
1374 compression: None,
1375 codec_config: (),
1376 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1377 write_buffer: NZUsize!(1024),
1378 };
1379
1380 let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
1381 .await
1382 .unwrap();
1383 for i in 0..20u64 {
1384 journal.append(&(i * 100)).await.unwrap();
1385 }
1386 journal.sync().await.unwrap();
1387 drop(journal);
1388
1389 let cfg = Config {
1390 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
1391 ..cfg
1392 };
1393 let journal = Journal::<_, u64>::init(context.child("second"), cfg)
1394 .await
1395 .unwrap();
1396 let reader = journal.reader().await;
1397 let positions: Vec<u64> = (3..10).collect();
1398 let items = reader.read_many(&positions).await.unwrap();
1399 assert_eq!(items, vec![300, 400, 500, 600, 700, 800, 900]);
1400 drop(reader);
1401
1402 journal.destroy().await.unwrap();
1403 });
1404 }
1405
1406 #[test_traced]
1412 fn test_variable_offsets_partition_loss_after_prune_unrecoverable() {
1413 let executor = deterministic::Runner::default();
1414 executor.start(|context| async move {
1415 let cfg = Config {
1416 partition: "offsets-loss-after-prune".into(),
1417 items_per_section: NZU64!(10),
1418 compression: None,
1419 codec_config: (),
1420 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1421 write_buffer: NZUsize!(1024),
1422 };
1423
1424 let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
1426 .await
1427 .unwrap();
1428
1429 for i in 0..40u64 {
1431 journal.append(&(i * 100)).await.unwrap();
1432 }
1433
1434 journal.prune(20).await.unwrap();
1436 let bounds = journal.bounds().await;
1437 assert_eq!(bounds.start, 20);
1438 assert_eq!(bounds.end, 40);
1439
1440 journal.sync().await.unwrap();
1441 drop(journal);
1442
1443 context
1446 .remove(&format!("{}-blobs", cfg.offsets_partition()), None)
1447 .await
1448 .expect("Failed to remove offsets blobs partition");
1449 context
1450 .remove(&format!("{}-metadata", cfg.offsets_partition()), None)
1451 .await
1452 .expect("Failed to remove offsets metadata partition");
1453
1454 let result = Journal::<_, u64>::init(context.child("second"), cfg.clone()).await;
1456 assert!(matches!(result, Err(Error::Corruption(_))));
1457 });
1458 }
1459
1460 #[test_traced]
1468 fn test_variable_align_data_offsets_mismatch() {
1469 let executor = deterministic::Runner::default();
1470 executor.start(|context| async move {
1471 let cfg = Config {
1472 partition: "data-loss-test".into(),
1473 items_per_section: NZU64!(10),
1474 compression: None,
1475 codec_config: (),
1476 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1477 write_buffer: NZUsize!(1024),
1478 };
1479
1480 let variable = Journal::<_, u64>::init(context.child("first"), cfg.clone())
1482 .await
1483 .unwrap();
1484
1485 for i in 0..20u64 {
1487 variable.append(&(i * 100)).await.unwrap();
1488 }
1489
1490 variable.sync().await.unwrap();
1491 drop(variable);
1492
1493 context
1495 .remove(&cfg.data_partition(), None)
1496 .await
1497 .expect("Failed to remove data partition");
1498
1499 let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
1501 .await
1502 .expect("Should align offsets to match empty data");
1503
1504 assert_eq!(journal.size().await, 20);
1506
1507 assert!(journal.bounds().await.is_empty());
1509
1510 for i in 0..20 {
1512 assert!(matches!(
1513 journal.read(i).await,
1514 Err(crate::journal::Error::ItemPruned(_))
1515 ));
1516 }
1517
1518 let pos = journal.append(&999).await.unwrap();
1520 assert_eq!(pos, 20);
1521 assert_eq!(journal.read(20).await.unwrap(), 999);
1522
1523 journal.destroy().await.unwrap();
1524 });
1525 }
1526
1527 #[test_traced]
1529 fn test_variable_replay() {
1530 let executor = deterministic::Runner::default();
1531 executor.start(|context| async move {
1532 let cfg = Config {
1533 partition: "replay".into(),
1534 items_per_section: NZU64!(10),
1535 compression: None,
1536 codec_config: (),
1537 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1538 write_buffer: NZUsize!(1024),
1539 };
1540
1541 let journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1543
1544 for i in 0..40u64 {
1546 journal.append(&(i * 100)).await.unwrap();
1547 }
1548
1549 {
1551 let reader = journal.reader().await;
1552 let stream = reader.replay(NZUsize!(20), 0).await.unwrap();
1553 futures::pin_mut!(stream);
1554 for i in 0..40u64 {
1555 let (pos, item) = stream.next().await.unwrap().unwrap();
1556 assert_eq!(pos, i);
1557 assert_eq!(item, i * 100);
1558 }
1559 assert!(stream.next().await.is_none());
1560 }
1561
1562 {
1564 let reader = journal.reader().await;
1565 let stream = reader.replay(NZUsize!(20), 15).await.unwrap();
1566 futures::pin_mut!(stream);
1567 for i in 15..40u64 {
1568 let (pos, item) = stream.next().await.unwrap().unwrap();
1569 assert_eq!(pos, i);
1570 assert_eq!(item, i * 100);
1571 }
1572 assert!(stream.next().await.is_none());
1573 }
1574
1575 {
1577 let reader = journal.reader().await;
1578 let stream = reader.replay(NZUsize!(20), 20).await.unwrap();
1579 futures::pin_mut!(stream);
1580 for i in 20..40u64 {
1581 let (pos, item) = stream.next().await.unwrap().unwrap();
1582 assert_eq!(pos, i);
1583 assert_eq!(item, i * 100);
1584 }
1585 assert!(stream.next().await.is_none());
1586 }
1587
1588 journal.prune(20).await.unwrap();
1590 {
1591 let reader = journal.reader().await;
1592 let res = reader.replay(NZUsize!(20), 0).await;
1593 assert!(matches!(res, Err(crate::journal::Error::ItemPruned(_))));
1594 }
1595 {
1596 let reader = journal.reader().await;
1597 let res = reader.replay(NZUsize!(20), 19).await;
1598 assert!(matches!(res, Err(crate::journal::Error::ItemPruned(_))));
1599 }
1600
1601 {
1603 let reader = journal.reader().await;
1604 let stream = reader.replay(NZUsize!(20), 20).await.unwrap();
1605 futures::pin_mut!(stream);
1606 for i in 20..40u64 {
1607 let (pos, item) = stream.next().await.unwrap().unwrap();
1608 assert_eq!(pos, i);
1609 assert_eq!(item, i * 100);
1610 }
1611 assert!(stream.next().await.is_none());
1612 }
1613
1614 {
1616 let reader = journal.reader().await;
1617 let stream = reader.replay(NZUsize!(20), 40).await.unwrap();
1618 futures::pin_mut!(stream);
1619 assert!(stream.next().await.is_none());
1620 }
1621
1622 {
1624 let reader = journal.reader().await;
1625 let res = reader.replay(NZUsize!(20), 41).await;
1626 assert!(matches!(
1627 res,
1628 Err(crate::journal::Error::ItemOutOfRange(41))
1629 ));
1630 }
1631
1632 journal.destroy().await.unwrap();
1633 });
1634 }
1635
1636 #[test_traced]
1637 fn test_variable_contiguous() {
1638 let executor = deterministic::Runner::default();
1639 executor.start(|context| async move {
1640 run_contiguous_tests(move |test_name: String, idx: usize| {
1641 let label = test_name.replace('-', "_");
1642 let context = context
1643 .child("test")
1644 .with_attribute("name", &label)
1645 .with_attribute("index", idx);
1646 async move {
1647 let cfg = Config {
1648 partition: format!("generic-test-{test_name}"),
1649 items_per_section: NZU64!(10),
1650 compression: None,
1651 codec_config: (),
1652 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1653 write_buffer: NZUsize!(1024),
1654 };
1655 Journal::<_, u64>::init(context, cfg).await
1656 }
1657 .boxed()
1658 })
1659 .await;
1660 });
1661 }
1662
1663 #[test_traced]
1665 fn test_variable_multiple_sequential_prunes() {
1666 let executor = deterministic::Runner::default();
1667 executor.start(|context| async move {
1668 let cfg = Config {
1669 partition: "sequential-prunes".into(),
1670 items_per_section: NZU64!(10),
1671 compression: None,
1672 codec_config: (),
1673 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1674 write_buffer: NZUsize!(1024),
1675 };
1676
1677 let journal = Journal::<_, u64>::init(context, cfg).await.unwrap();
1678
1679 for i in 0..40u64 {
1681 journal.append(&(i * 100)).await.unwrap();
1682 }
1683
1684 let bounds = journal.bounds().await;
1686 assert_eq!(bounds.start, 0);
1687 assert_eq!(bounds.end, 40);
1688
1689 let pruned = journal.prune(10).await.unwrap();
1691 assert!(pruned);
1692
1693 assert_eq!(journal.bounds().await.start, 10);
1695
1696 assert!(matches!(
1698 journal.read(0).await,
1699 Err(crate::journal::Error::ItemPruned(_))
1700 ));
1701 assert_eq!(journal.read(10).await.unwrap(), 1000);
1702 assert_eq!(journal.read(19).await.unwrap(), 1900);
1703
1704 let pruned = journal.prune(20).await.unwrap();
1706 assert!(pruned);
1707
1708 assert_eq!(journal.bounds().await.start, 20);
1710
1711 assert!(matches!(
1713 journal.read(10).await,
1714 Err(crate::journal::Error::ItemPruned(_))
1715 ));
1716 assert!(matches!(
1717 journal.read(19).await,
1718 Err(crate::journal::Error::ItemPruned(_))
1719 ));
1720 assert_eq!(journal.read(20).await.unwrap(), 2000);
1721 assert_eq!(journal.read(29).await.unwrap(), 2900);
1722
1723 let pruned = journal.prune(30).await.unwrap();
1725 assert!(pruned);
1726
1727 assert_eq!(journal.bounds().await.start, 30);
1729
1730 assert!(matches!(
1732 journal.read(20).await,
1733 Err(crate::journal::Error::ItemPruned(_))
1734 ));
1735 assert!(matches!(
1736 journal.read(29).await,
1737 Err(crate::journal::Error::ItemPruned(_))
1738 ));
1739 assert_eq!(journal.read(30).await.unwrap(), 3000);
1740 assert_eq!(journal.read(39).await.unwrap(), 3900);
1741
1742 assert_eq!(journal.size().await, 40);
1744
1745 journal.destroy().await.unwrap();
1746 });
1747 }
1748
1749 #[test_traced]
1751 fn test_variable_prune_all_then_reinit() {
1752 let executor = deterministic::Runner::default();
1753 executor.start(|context| async move {
1754 let cfg = Config {
1755 partition: "prune-all-reinit".into(),
1756 items_per_section: NZU64!(10),
1757 compression: None,
1758 codec_config: (),
1759 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1760 write_buffer: NZUsize!(1024),
1761 };
1762
1763 let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
1765 .await
1766 .unwrap();
1767
1768 for i in 0..100u64 {
1769 journal.append(&(i * 100)).await.unwrap();
1770 }
1771
1772 let bounds = journal.bounds().await;
1773 assert_eq!(bounds.end, 100);
1774 assert_eq!(bounds.start, 0);
1775
1776 let pruned = journal.prune(100).await.unwrap();
1778 assert!(pruned);
1779
1780 let bounds = journal.bounds().await;
1782 assert_eq!(bounds.end, 100);
1783 assert!(bounds.is_empty());
1784
1785 for i in 0..100 {
1787 assert!(matches!(
1788 journal.read(i).await,
1789 Err(crate::journal::Error::ItemPruned(_))
1790 ));
1791 }
1792
1793 journal.sync().await.unwrap();
1794 drop(journal);
1795
1796 let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
1798 .await
1799 .unwrap();
1800
1801 let bounds = journal.bounds().await;
1803 assert_eq!(bounds.end, 100);
1804 assert!(bounds.is_empty());
1805
1806 for i in 0..100 {
1808 assert!(matches!(
1809 journal.read(i).await,
1810 Err(crate::journal::Error::ItemPruned(_))
1811 ));
1812 }
1813
1814 journal.append(&10000).await.unwrap();
1817 let bounds = journal.bounds().await;
1818 assert_eq!(bounds.end, 101);
1819 assert_eq!(bounds.start, 100);
1821
1822 assert_eq!(journal.read(100).await.unwrap(), 10000);
1824
1825 assert!(matches!(
1827 journal.read(99).await,
1828 Err(crate::journal::Error::ItemPruned(_))
1829 ));
1830
1831 journal.destroy().await.unwrap();
1832 });
1833 }
1834
1835 #[test_traced]
1837 fn test_variable_recovery_prune_crash_offsets_behind() {
1838 let executor = deterministic::Runner::default();
1839 executor.start(|context| async move {
1840 let cfg = Config {
1842 partition: "recovery-prune-crash".into(),
1843 items_per_section: NZU64!(10),
1844 compression: None,
1845 codec_config: (),
1846 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1847 write_buffer: NZUsize!(1024),
1848 };
1849
1850 let variable = Journal::<_, u64>::init(context.child("first"), cfg.clone())
1851 .await
1852 .unwrap();
1853
1854 for i in 0..40u64 {
1856 variable.append(&(i * 100)).await.unwrap();
1857 }
1858
1859 variable.prune(10).await.unwrap();
1861 assert_eq!(variable.bounds().await.start, 10);
1862
1863 variable.test_prune_data(2).await.unwrap();
1866 variable.sync().await.unwrap();
1869 drop(variable);
1870
1871 let variable = Journal::<_, u64>::init(context.child("second"), cfg.clone())
1873 .await
1874 .unwrap();
1875
1876 let bounds = variable.bounds().await;
1878 assert_eq!(bounds.start, 20);
1879 assert_eq!(bounds.end, 40);
1880
1881 assert!(matches!(
1883 variable.read(10).await,
1884 Err(crate::journal::Error::ItemPruned(_))
1885 ));
1886
1887 assert_eq!(variable.read(20).await.unwrap(), 2000);
1889 assert_eq!(variable.read(39).await.unwrap(), 3900);
1890
1891 variable.destroy().await.unwrap();
1892 });
1893 }
1894
1895 #[test_traced]
1900 fn test_variable_recovery_offsets_ahead_corruption() {
1901 let executor = deterministic::Runner::default();
1902 executor.start(|context| async move {
1903 let cfg = Config {
1905 partition: "recovery-offsets-ahead".into(),
1906 items_per_section: NZU64!(10),
1907 compression: None,
1908 codec_config: (),
1909 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1910 write_buffer: NZUsize!(1024),
1911 };
1912
1913 let variable = Journal::<_, u64>::init(context.child("first"), cfg.clone())
1914 .await
1915 .unwrap();
1916
1917 for i in 0..40u64 {
1919 variable.append(&(i * 100)).await.unwrap();
1920 }
1921
1922 variable.test_prune_offsets(20).await.unwrap(); variable.test_prune_data(1).await.unwrap(); variable.sync().await.unwrap();
1927 drop(variable);
1928
1929 let result = Journal::<_, u64>::init(context.child("second"), cfg.clone()).await;
1931 assert!(matches!(result, Err(Error::Corruption(_))));
1932 });
1933 }
1934
1935 #[test_traced]
1937 fn test_variable_recovery_append_crash_offsets_behind() {
1938 let executor = deterministic::Runner::default();
1939 executor.start(|context| async move {
1940 let cfg = Config {
1942 partition: "recovery-append-crash".into(),
1943 items_per_section: NZU64!(10),
1944 compression: None,
1945 codec_config: (),
1946 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
1947 write_buffer: NZUsize!(1024),
1948 };
1949
1950 let variable = Journal::<_, u64>::init(context.child("first"), cfg.clone())
1951 .await
1952 .unwrap();
1953
1954 for i in 0..15u64 {
1956 variable.append(&(i * 100)).await.unwrap();
1957 }
1958
1959 assert_eq!(variable.size().await, 15);
1960
1961 for i in 15..20u64 {
1963 variable.test_append_data(1, i * 100).await.unwrap();
1964 }
1965 variable.sync().await.unwrap();
1968 drop(variable);
1969
1970 let variable = Journal::<_, u64>::init(context.child("second"), cfg.clone())
1972 .await
1973 .unwrap();
1974
1975 let bounds = variable.bounds().await;
1977 assert_eq!(bounds.end, 20);
1978 assert_eq!(bounds.start, 0);
1979
1980 for i in 0..20u64 {
1982 assert_eq!(variable.read(i).await.unwrap(), i * 100);
1983 }
1984
1985 assert_eq!(variable.test_offsets_size().await, 20);
1987
1988 variable.destroy().await.unwrap();
1989 });
1990 }
1991
1992 #[test_traced]
1994 fn test_variable_recovery_multiple_prunes_crash() {
1995 let executor = deterministic::Runner::default();
1996 executor.start(|context| async move {
1997 let cfg = Config {
1999 partition: "recovery-multiple-prunes".into(),
2000 items_per_section: NZU64!(10),
2001 compression: None,
2002 codec_config: (),
2003 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2004 write_buffer: NZUsize!(1024),
2005 };
2006
2007 let variable = Journal::<_, u64>::init(context.child("first"), cfg.clone())
2008 .await
2009 .unwrap();
2010
2011 for i in 0..50u64 {
2013 variable.append(&(i * 100)).await.unwrap();
2014 }
2015
2016 variable.prune(10).await.unwrap();
2018 assert_eq!(variable.bounds().await.start, 10);
2019
2020 variable.test_prune_data(3).await.unwrap();
2023 variable.sync().await.unwrap();
2026 drop(variable);
2027
2028 let variable = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2030 .await
2031 .unwrap();
2032
2033 let bounds = variable.bounds().await;
2035 assert_eq!(bounds.start, 30);
2036 assert_eq!(bounds.end, 50);
2037
2038 assert!(matches!(
2040 variable.read(10).await,
2041 Err(crate::journal::Error::ItemPruned(_))
2042 ));
2043 assert!(matches!(
2044 variable.read(20).await,
2045 Err(crate::journal::Error::ItemPruned(_))
2046 ));
2047
2048 assert_eq!(variable.read(30).await.unwrap(), 3000);
2050 assert_eq!(variable.read(49).await.unwrap(), 4900);
2051
2052 variable.destroy().await.unwrap();
2053 });
2054 }
2055
2056 #[test_traced]
2063 fn test_variable_recovery_rewind_crash_multi_section() {
2064 let executor = deterministic::Runner::default();
2065 executor.start(|context| async move {
2066 let cfg = Config {
2068 partition: "recovery-rewind-crash".into(),
2069 items_per_section: NZU64!(10),
2070 compression: None,
2071 codec_config: (),
2072 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2073 write_buffer: NZUsize!(1024),
2074 };
2075
2076 let variable = Journal::<_, u64>::init(context.child("first"), cfg.clone())
2077 .await
2078 .unwrap();
2079
2080 for i in 0..25u64 {
2082 variable.append(&(i * 100)).await.unwrap();
2083 }
2084
2085 assert_eq!(variable.size().await, 25);
2086
2087 variable.test_rewind_offsets(5).await.unwrap();
2090 variable.sync().await.unwrap();
2093 drop(variable);
2094
2095 let variable = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2097 .await
2098 .unwrap();
2099
2100 let bounds = variable.bounds().await;
2102 assert_eq!(bounds.end, 25);
2103 assert_eq!(bounds.start, 0);
2104
2105 for i in 0..25u64 {
2107 assert_eq!(variable.read(i).await.unwrap(), i * 100);
2108 }
2109
2110 assert_eq!(variable.test_offsets_size().await, 25);
2112
2113 let pos = variable.append(&2500).await.unwrap();
2115 assert_eq!(pos, 25);
2116 assert_eq!(variable.read(25).await.unwrap(), 2500);
2117
2118 variable.destroy().await.unwrap();
2119 });
2120 }
2121
2122 #[test_traced]
2125 fn test_variable_recovery_empty_offsets_after_prune_and_append() {
2126 let executor = deterministic::Runner::default();
2127 executor.start(|context| async move {
2128 let cfg = Config {
2129 partition: "recovery-empty-after-prune".into(),
2130 items_per_section: NZU64!(10),
2131 compression: None,
2132 codec_config: (),
2133 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2134 write_buffer: NZUsize!(1024),
2135 };
2136
2137 let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
2139 .await
2140 .unwrap();
2141
2142 for i in 0..10u64 {
2144 journal.append(&(i * 100)).await.unwrap();
2145 }
2146 let bounds = journal.bounds().await;
2147 assert_eq!(bounds.end, 10);
2148 assert_eq!(bounds.start, 0);
2149
2150 journal.prune(10).await.unwrap();
2152 let bounds = journal.bounds().await;
2153 assert_eq!(bounds.end, 10);
2154 assert!(bounds.is_empty()); for i in 10..20u64 {
2160 journal.test_append_data(1, i * 100).await.unwrap();
2161 }
2162 journal.test_sync_data().await.unwrap();
2164 drop(journal);
2168
2169 let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2171 .await
2172 .expect("Should recover from crash after data sync but before offsets sync");
2173
2174 let bounds = journal.bounds().await;
2176 assert_eq!(bounds.end, 20);
2177 assert_eq!(bounds.start, 10);
2178
2179 for i in 10..20u64 {
2181 assert_eq!(journal.read(i).await.unwrap(), i * 100);
2182 }
2183
2184 for i in 0..10 {
2186 assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
2187 }
2188
2189 journal.destroy().await.unwrap();
2190 });
2191 }
2192
2193 #[test_traced]
2195 fn test_variable_concurrent_sync_recovery() {
2196 let executor = deterministic::Runner::default();
2197 executor.start(|context| async move {
2198 let cfg = Config {
2199 partition: "concurrent-sync-recovery".into(),
2200 items_per_section: NZU64!(10),
2201 compression: None,
2202 codec_config: (),
2203 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2204 write_buffer: NZUsize!(1024),
2205 };
2206
2207 let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
2208 .await
2209 .unwrap();
2210
2211 for i in 0..15u64 {
2213 journal.append(&(i * 100)).await.unwrap();
2214 }
2215
2216 journal.commit().await.unwrap();
2218
2219 drop(journal);
2221
2222 let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2223 .await
2224 .unwrap();
2225
2226 assert_eq!(journal.size().await, 15);
2228 for i in 0..15u64 {
2229 assert_eq!(journal.read(i).await.unwrap(), i * 100);
2230 }
2231
2232 journal.destroy().await.unwrap();
2233 });
2234 }
2235
2236 #[test_traced]
2237 fn test_init_at_size_zero() {
2238 let executor = deterministic::Runner::default();
2239 executor.start(|context| async move {
2240 let cfg = Config {
2241 partition: "init-at-size-zero".into(),
2242 items_per_section: NZU64!(5),
2243 compression: None,
2244 codec_config: (),
2245 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2246 write_buffer: NZUsize!(1024),
2247 };
2248
2249 let journal = Journal::<_, u64>::init_at_size(context.child("storage"), cfg.clone(), 0)
2250 .await
2251 .unwrap();
2252
2253 assert_eq!(journal.size().await, 0);
2255
2256 assert!(journal.bounds().await.is_empty());
2258
2259 let pos = journal.append(&100).await.unwrap();
2261 assert_eq!(pos, 0);
2262 assert_eq!(journal.size().await, 1);
2263 assert_eq!(journal.read(0).await.unwrap(), 100);
2264
2265 journal.destroy().await.unwrap();
2266 });
2267 }
2268
2269 #[test_traced]
2270 fn test_init_at_size_section_boundary() {
2271 let executor = deterministic::Runner::default();
2272 executor.start(|context| async move {
2273 let cfg = Config {
2274 partition: "init-at-size-boundary".into(),
2275 items_per_section: NZU64!(5),
2276 compression: None,
2277 codec_config: (),
2278 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2279 write_buffer: NZUsize!(1024),
2280 };
2281
2282 let journal =
2284 Journal::<_, u64>::init_at_size(context.child("storage"), cfg.clone(), 10)
2285 .await
2286 .unwrap();
2287
2288 let bounds = journal.bounds().await;
2290 assert_eq!(bounds.end, 10);
2291
2292 assert!(bounds.is_empty());
2294
2295 let pos = journal.append(&1000).await.unwrap();
2297 assert_eq!(pos, 10);
2298 assert_eq!(journal.size().await, 11);
2299 assert_eq!(journal.read(10).await.unwrap(), 1000);
2300
2301 let pos = journal.append(&1001).await.unwrap();
2303 assert_eq!(pos, 11);
2304 assert_eq!(journal.read(11).await.unwrap(), 1001);
2305
2306 journal.destroy().await.unwrap();
2307 });
2308 }
2309
2310 #[test_traced]
2311 fn test_init_at_size_mid_section() {
2312 let executor = deterministic::Runner::default();
2313 executor.start(|context| async move {
2314 let cfg = Config {
2315 partition: "init-at-size-mid".into(),
2316 items_per_section: NZU64!(5),
2317 compression: None,
2318 codec_config: (),
2319 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2320 write_buffer: NZUsize!(1024),
2321 };
2322
2323 let journal = Journal::<_, u64>::init_at_size(context.child("storage"), cfg.clone(), 7)
2325 .await
2326 .unwrap();
2327
2328 let bounds = journal.bounds().await;
2330 assert_eq!(bounds.end, 7);
2331
2332 assert!(bounds.is_empty());
2334
2335 let pos = journal.append(&700).await.unwrap();
2337 assert_eq!(pos, 7);
2338 assert_eq!(journal.size().await, 8);
2339 assert_eq!(journal.read(7).await.unwrap(), 700);
2340
2341 journal.destroy().await.unwrap();
2342 });
2343 }
2344
2345 #[test_traced]
2346 fn test_init_at_size_persistence() {
2347 let executor = deterministic::Runner::default();
2348 executor.start(|context| async move {
2349 let cfg = Config {
2350 partition: "init-at-size-persist".into(),
2351 items_per_section: NZU64!(5),
2352 compression: None,
2353 codec_config: (),
2354 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2355 write_buffer: NZUsize!(1024),
2356 };
2357
2358 let journal = Journal::<_, u64>::init_at_size(context.child("first"), cfg.clone(), 15)
2360 .await
2361 .unwrap();
2362
2363 for i in 0..5u64 {
2365 let pos = journal.append(&(1500 + i)).await.unwrap();
2366 assert_eq!(pos, 15 + i);
2367 }
2368
2369 assert_eq!(journal.size().await, 20);
2370
2371 journal.sync().await.unwrap();
2373 drop(journal);
2374
2375 let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2376 .await
2377 .unwrap();
2378
2379 let bounds = journal.bounds().await;
2381 assert_eq!(bounds.end, 20);
2382 assert_eq!(bounds.start, 15);
2383
2384 for i in 0..5u64 {
2386 assert_eq!(journal.read(15 + i).await.unwrap(), 1500 + i);
2387 }
2388
2389 let pos = journal.append(&9999).await.unwrap();
2391 assert_eq!(pos, 20);
2392 assert_eq!(journal.read(20).await.unwrap(), 9999);
2393
2394 journal.destroy().await.unwrap();
2395 });
2396 }
2397
2398 #[test_traced]
2399 fn test_init_at_size_persistence_without_data() {
2400 let executor = deterministic::Runner::default();
2401 executor.start(|context| async move {
2402 let cfg = Config {
2403 partition: "init-at-size-persist-empty".into(),
2404 items_per_section: NZU64!(5),
2405 compression: None,
2406 codec_config: (),
2407 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2408 write_buffer: NZUsize!(1024),
2409 };
2410
2411 let journal = Journal::<_, u64>::init_at_size(context.child("first"), cfg.clone(), 15)
2413 .await
2414 .unwrap();
2415
2416 let bounds = journal.bounds().await;
2417 assert_eq!(bounds.end, 15);
2418 assert!(bounds.is_empty());
2419
2420 drop(journal);
2422
2423 let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2425 .await
2426 .unwrap();
2427
2428 let bounds = journal.bounds().await;
2429 assert_eq!(bounds.end, 15);
2430 assert!(bounds.is_empty());
2431
2432 let pos = journal.append(&1500).await.unwrap();
2434 assert_eq!(pos, 15);
2435 assert_eq!(journal.read(15).await.unwrap(), 1500);
2436
2437 journal.destroy().await.unwrap();
2438 });
2439 }
2440
2441 #[test_traced]
2443 fn test_init_at_size_mid_section_persistence() {
2444 let executor = deterministic::Runner::default();
2445 executor.start(|context| async move {
2446 let cfg = Config {
2447 partition: "init-at-size-mid-section".into(),
2448 items_per_section: NZU64!(5),
2449 compression: None,
2450 codec_config: (),
2451 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2452 write_buffer: NZUsize!(1024),
2453 };
2454
2455 let journal = Journal::<_, u64>::init_at_size(context.child("first"), cfg.clone(), 7)
2457 .await
2458 .unwrap();
2459
2460 for i in 0..3u64 {
2462 let pos = journal.append(&(700 + i)).await.unwrap();
2463 assert_eq!(pos, 7 + i);
2464 }
2465
2466 let bounds = journal.bounds().await;
2467 assert_eq!(bounds.end, 10);
2468 assert_eq!(bounds.start, 7);
2469
2470 journal.sync().await.unwrap();
2472 drop(journal);
2473
2474 let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2476 .await
2477 .unwrap();
2478
2479 let bounds = journal.bounds().await;
2481 assert_eq!(bounds.end, 10);
2482 assert_eq!(bounds.start, 7);
2483
2484 for i in 0..3u64 {
2486 assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2487 }
2488
2489 assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2491
2492 journal.destroy().await.unwrap();
2493 });
2494 }
2495
2496 #[test_traced]
2498 fn test_init_at_size_mid_section_multi_section_persistence() {
2499 let executor = deterministic::Runner::default();
2500 executor.start(|context| async move {
2501 let cfg = Config {
2502 partition: "init-at-size-multi-section".into(),
2503 items_per_section: NZU64!(5),
2504 compression: None,
2505 codec_config: (),
2506 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2507 write_buffer: NZUsize!(1024),
2508 };
2509
2510 let journal = Journal::<_, u64>::init_at_size(context.child("first"), cfg.clone(), 7)
2512 .await
2513 .unwrap();
2514
2515 for i in 0..8u64 {
2517 let pos = journal.append(&(700 + i)).await.unwrap();
2518 assert_eq!(pos, 7 + i);
2519 }
2520
2521 let bounds = journal.bounds().await;
2522 assert_eq!(bounds.end, 15);
2523 assert_eq!(bounds.start, 7);
2524
2525 journal.sync().await.unwrap();
2527 drop(journal);
2528
2529 let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2531 .await
2532 .unwrap();
2533
2534 let bounds = journal.bounds().await;
2536 assert_eq!(bounds.end, 15);
2537 assert_eq!(bounds.start, 7);
2538
2539 for i in 0..8u64 {
2541 assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2542 }
2543
2544 journal.destroy().await.unwrap();
2545 });
2546 }
2547
2548 #[test_traced]
2550 fn test_align_journals_data_empty_mid_section_pruning_boundary() {
2551 let executor = deterministic::Runner::default();
2552 executor.start(|context| async move {
2553 let cfg = Config {
2554 partition: "align-journals-mid-section-pruning-boundary".into(),
2555 items_per_section: NZU64!(5),
2556 compression: None,
2557 codec_config: (),
2558 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2559 write_buffer: NZUsize!(1024),
2560 };
2561
2562 let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
2564 .await
2565 .unwrap();
2566 for i in 0..7u64 {
2567 journal.append(&(100 + i)).await.unwrap();
2568 }
2569 journal.sync().await.unwrap();
2570
2571 journal.inner.write().await.data.clear().await.unwrap();
2573 drop(journal);
2574
2575 let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2577 .await
2578 .unwrap();
2579 let bounds = journal.bounds().await;
2580 assert_eq!(bounds.end, 7);
2581 assert!(bounds.is_empty());
2582
2583 let pos = journal.append(&777).await.unwrap();
2585 assert_eq!(pos, 7);
2586 assert_eq!(journal.size().await, 8);
2587 assert_eq!(journal.read(7).await.unwrap(), 777);
2588
2589 let section = 7 / cfg.items_per_section.get();
2591 journal
2592 .inner
2593 .write()
2594 .await
2595 .data
2596 .sync(section)
2597 .await
2598 .unwrap();
2599 drop(journal);
2600
2601 let journal = Journal::<_, u64>::init(context.child("third"), cfg.clone())
2603 .await
2604 .unwrap();
2605 let bounds = journal.bounds().await;
2606 assert_eq!(bounds.end, 8);
2607 assert_eq!(bounds.start, 7);
2608 assert_eq!(journal.read(7).await.unwrap(), 777);
2609
2610 journal.destroy().await.unwrap();
2611 });
2612 }
2613
2614 #[test_traced]
2616 fn test_init_at_size_crash_data_synced_offsets_not() {
2617 let executor = deterministic::Runner::default();
2618 executor.start(|context| async move {
2619 let cfg = Config {
2620 partition: "init-at-size-crash-recovery".into(),
2621 items_per_section: NZU64!(5),
2622 compression: None,
2623 codec_config: (),
2624 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2625 write_buffer: NZUsize!(1024),
2626 };
2627
2628 let journal = Journal::<_, u64>::init_at_size(context.child("first"), cfg.clone(), 7)
2630 .await
2631 .unwrap();
2632
2633 for i in 0..3u64 {
2635 journal.append(&(700 + i)).await.unwrap();
2636 }
2637
2638 journal.inner.write().await.data.sync(1).await.unwrap();
2640 drop(journal);
2642
2643 let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
2645 .await
2646 .unwrap();
2647
2648 let bounds = journal.bounds().await;
2650 assert_eq!(bounds.end, 10);
2651 assert_eq!(bounds.start, 7);
2652
2653 for i in 0..3u64 {
2655 assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i);
2656 }
2657
2658 journal.destroy().await.unwrap();
2659 });
2660 }
2661
2662 #[test_traced]
2663 fn test_prune_does_not_move_oldest_retained_backwards() {
2664 let executor = deterministic::Runner::default();
2665 executor.start(|context| async move {
2666 let cfg = Config {
2667 partition: "prune-no-backwards".into(),
2668 items_per_section: NZU64!(5),
2669 compression: None,
2670 codec_config: (),
2671 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2672 write_buffer: NZUsize!(1024),
2673 };
2674
2675 let journal = Journal::<_, u64>::init_at_size(context.child("first"), cfg.clone(), 7)
2676 .await
2677 .unwrap();
2678
2679 for i in 0..3u64 {
2681 let pos = journal.append(&(700 + i)).await.unwrap();
2682 assert_eq!(pos, 7 + i);
2683 }
2684 assert_eq!(journal.bounds().await.start, 7);
2685
2686 journal.prune(8).await.unwrap();
2688 assert_eq!(journal.bounds().await.start, 7);
2689 assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2690 assert_eq!(journal.read(7).await.unwrap(), 700);
2691
2692 journal.destroy().await.unwrap();
2693 });
2694 }
2695
2696 #[test_traced]
2697 fn test_init_at_size_large_offset() {
2698 let executor = deterministic::Runner::default();
2699 executor.start(|context| async move {
2700 let cfg = Config {
2701 partition: "init-at-size-large".into(),
2702 items_per_section: NZU64!(5),
2703 compression: None,
2704 codec_config: (),
2705 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2706 write_buffer: NZUsize!(1024),
2707 };
2708
2709 let journal =
2711 Journal::<_, u64>::init_at_size(context.child("storage"), cfg.clone(), 1000)
2712 .await
2713 .unwrap();
2714
2715 let bounds = journal.bounds().await;
2716 assert_eq!(bounds.end, 1000);
2717 assert!(bounds.is_empty());
2719
2720 let pos = journal.append(&100000).await.unwrap();
2722 assert_eq!(pos, 1000);
2723 assert_eq!(journal.read(1000).await.unwrap(), 100000);
2724
2725 journal.destroy().await.unwrap();
2726 });
2727 }
2728
2729 #[test_traced]
2730 fn test_init_at_size_prune_and_append() {
2731 let executor = deterministic::Runner::default();
2732 executor.start(|context| async move {
2733 let cfg = Config {
2734 partition: "init-at-size-prune".into(),
2735 items_per_section: NZU64!(5),
2736 compression: None,
2737 codec_config: (),
2738 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)),
2739 write_buffer: NZUsize!(1024),
2740 };
2741
2742 let journal =
2744 Journal::<_, u64>::init_at_size(context.child("storage"), cfg.clone(), 20)
2745 .await
2746 .unwrap();
2747
2748 for i in 0..10u64 {
2750 journal.append(&(2000 + i)).await.unwrap();
2751 }
2752
2753 assert_eq!(journal.size().await, 30);
2754
2755 journal.prune(25).await.unwrap();
2757
2758 let bounds = journal.bounds().await;
2759 assert_eq!(bounds.end, 30);
2760 assert_eq!(bounds.start, 25);
2761
2762 for i in 25..30u64 {
2764 assert_eq!(journal.read(i).await.unwrap(), 2000 + (i - 20));
2765 }
2766
2767 let pos = journal.append(&3000).await.unwrap();
2769 assert_eq!(pos, 30);
2770
2771 journal.destroy().await.unwrap();
2772 });
2773 }
2774
2775 #[test_traced]
2777 fn test_init_sync_no_existing_data() {
2778 let executor = deterministic::Runner::default();
2779 executor.start(|context| async move {
2780 let cfg = Config {
2781 partition: "test-fresh-start".into(),
2782 items_per_section: NZU64!(5),
2783 compression: None,
2784 codec_config: (),
2785 write_buffer: NZUsize!(1024),
2786 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2787 };
2788
2789 let lower_bound = 10;
2791 let upper_bound = 26;
2792 let journal = Journal::init_sync(
2793 context.child("storage"),
2794 cfg.clone(),
2795 lower_bound..upper_bound,
2796 )
2797 .await
2798 .expect("Failed to initialize journal with sync boundaries");
2799
2800 let bounds = journal.bounds().await;
2801 assert_eq!(bounds.end, lower_bound);
2802 assert!(bounds.is_empty());
2803
2804 let pos1 = journal.append(&42u64).await.unwrap();
2806 assert_eq!(pos1, lower_bound);
2807 assert_eq!(journal.read(pos1).await.unwrap(), 42u64);
2808
2809 let pos2 = journal.append(&43u64).await.unwrap();
2810 assert_eq!(pos2, lower_bound + 1);
2811 assert_eq!(journal.read(pos2).await.unwrap(), 43u64);
2812
2813 journal.destroy().await.unwrap();
2814 });
2815 }
2816
2817 #[test_traced]
2819 fn test_init_sync_existing_data_overlap() {
2820 let executor = deterministic::Runner::default();
2821 executor.start(|context| async move {
2822 let cfg = Config {
2823 partition: "test-overlap".into(),
2824 items_per_section: NZU64!(5),
2825 compression: None,
2826 codec_config: (),
2827 write_buffer: NZUsize!(1024),
2828 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2829 };
2830
2831 let journal =
2833 Journal::<deterministic::Context, u64>::init(context.child("storage"), cfg.clone())
2834 .await
2835 .expect("Failed to create initial journal");
2836
2837 for i in 0..20u64 {
2839 journal.append(&(i * 100)).await.unwrap();
2840 }
2841 journal.sync().await.unwrap();
2842 drop(journal);
2843
2844 let lower_bound = 8;
2847 let upper_bound = 31;
2848 let journal = Journal::<deterministic::Context, u64>::init_sync(
2849 context.child("storage"),
2850 cfg.clone(),
2851 lower_bound..upper_bound,
2852 )
2853 .await
2854 .expect("Failed to initialize journal with overlap");
2855
2856 assert_eq!(journal.size().await, 20);
2857
2858 assert_eq!(journal.bounds().await.start, 5); assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2863 assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
2864
2865 assert_eq!(journal.read(5).await.unwrap(), 500);
2867 assert_eq!(journal.read(8).await.unwrap(), 800);
2868 assert_eq!(journal.read(19).await.unwrap(), 1900);
2869
2870 assert!(matches!(
2872 journal.read(20).await,
2873 Err(Error::ItemOutOfRange(_))
2874 ));
2875
2876 let pos = journal.append(&999).await.unwrap();
2878 assert_eq!(pos, 20);
2879 assert_eq!(journal.read(20).await.unwrap(), 999);
2880
2881 journal.destroy().await.unwrap();
2882 });
2883 }
2884
2885 #[should_panic]
2887 #[test_traced]
2888 fn test_init_sync_invalid_parameters() {
2889 let executor = deterministic::Runner::default();
2890 executor.start(|context| async move {
2891 let cfg = Config {
2892 partition: "test-invalid".into(),
2893 items_per_section: NZU64!(5),
2894 compression: None,
2895 codec_config: (),
2896 write_buffer: NZUsize!(1024),
2897 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2898 };
2899
2900 #[allow(clippy::reversed_empty_ranges)]
2901 let _result = Journal::<deterministic::Context, u64>::init_sync(
2902 context.child("storage"),
2903 cfg,
2904 10..5, )
2906 .await;
2907 });
2908 }
2909
2910 #[test_traced]
2912 fn test_init_sync_existing_data_exact_match() {
2913 let executor = deterministic::Runner::default();
2914 executor.start(|context| async move {
2915 let items_per_section = NZU64!(5);
2916 let cfg = Config {
2917 partition: "test-exact-match".into(),
2918 items_per_section,
2919 compression: None,
2920 codec_config: (),
2921 write_buffer: NZUsize!(1024),
2922 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2923 };
2924
2925 let journal =
2927 Journal::<deterministic::Context, u64>::init(context.child("storage"), cfg.clone())
2928 .await
2929 .expect("Failed to create initial journal");
2930
2931 for i in 0..20u64 {
2933 journal.append(&(i * 100)).await.unwrap();
2934 }
2935 journal.sync().await.unwrap();
2936 drop(journal);
2937
2938 let lower_bound = 5; let upper_bound = 20; let journal = Journal::<deterministic::Context, u64>::init_sync(
2942 context.child("storage"),
2943 cfg.clone(),
2944 lower_bound..upper_bound,
2945 )
2946 .await
2947 .expect("Failed to initialize journal with exact match");
2948
2949 assert_eq!(journal.size().await, 20);
2950
2951 assert_eq!(journal.bounds().await.start, 5); assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
2956 assert!(matches!(journal.read(4).await, Err(Error::ItemPruned(_))));
2957
2958 assert_eq!(journal.read(5).await.unwrap(), 500);
2960 assert_eq!(journal.read(10).await.unwrap(), 1000);
2961 assert_eq!(journal.read(19).await.unwrap(), 1900);
2962
2963 assert!(matches!(
2965 journal.read(20).await,
2966 Err(Error::ItemOutOfRange(_))
2967 ));
2968
2969 let pos = journal.append(&999).await.unwrap();
2971 assert_eq!(pos, 20);
2972 assert_eq!(journal.read(20).await.unwrap(), 999);
2973
2974 journal.destroy().await.unwrap();
2975 });
2976 }
2977
2978 #[test_traced]
2981 fn test_init_sync_existing_data_exceeds_upper_bound() {
2982 let executor = deterministic::Runner::default();
2983 executor.start(|context| async move {
2984 let items_per_section = NZU64!(5);
2985 let cfg = Config {
2986 partition: "test-unexpected-data".into(),
2987 items_per_section,
2988 compression: None,
2989 codec_config: (),
2990 write_buffer: NZUsize!(1024),
2991 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
2992 };
2993
2994 let journal =
2996 Journal::<deterministic::Context, u64>::init(context.child("initial"), cfg.clone())
2997 .await
2998 .expect("Failed to create initial journal");
2999
3000 for i in 0..30u64 {
3002 journal.append(&(i * 1000)).await.unwrap();
3003 }
3004 journal.sync().await.unwrap();
3005 drop(journal);
3006
3007 let lower_bound = 8; for (i, upper_bound) in (9..29).enumerate() {
3010 let result = Journal::<deterministic::Context, u64>::init_sync(
3011 context.child("sync").with_attribute("index", i),
3012 cfg.clone(),
3013 lower_bound..upper_bound,
3014 )
3015 .await;
3016
3017 assert!(matches!(result, Err(Error::ItemOutOfRange(_))));
3019 }
3020 });
3021 }
3022
3023 #[test_traced]
3025 fn test_init_sync_empty_stale_position_beyond_upper_bound() {
3026 let executor = deterministic::Runner::default();
3027 executor.start(|context| async move {
3028 let cfg = Config {
3029 partition: "test-empty-stale-position".into(),
3030 items_per_section: NZU64!(5),
3031 compression: None,
3032 codec_config: (),
3033 write_buffer: NZUsize!(1024),
3034 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
3035 };
3036
3037 let stale_size = 30;
3038 let journal = Journal::<deterministic::Context, u64>::init_at_size(
3039 context.child("first"),
3040 cfg.clone(),
3041 stale_size,
3042 )
3043 .await
3044 .expect("Failed to create stale empty journal");
3045 assert_eq!(journal.size().await, stale_size);
3046 assert!(journal.bounds().await.is_empty());
3047 drop(journal);
3048
3049 let lower_bound = 10;
3050 let upper_bound = 26;
3051 let journal = Journal::<deterministic::Context, u64>::init_sync(
3052 context.child("second"),
3053 cfg.clone(),
3054 lower_bound..upper_bound,
3055 )
3056 .await
3057 .expect("Failed to repair stale empty journal");
3058
3059 assert_eq!(journal.size().await, lower_bound);
3060 assert!(journal.bounds().await.is_empty());
3061
3062 let pos = journal.append(&999).await.unwrap();
3063 assert_eq!(pos, lower_bound);
3064 assert_eq!(journal.read(pos).await.unwrap(), 999);
3065
3066 journal.destroy().await.unwrap();
3067 });
3068 }
3069
3070 #[test_traced]
3072 fn test_init_sync_recovers_from_stale_clear_to_size() {
3073 let executor = deterministic::Runner::default();
3074 executor.start(|context| async move {
3075 let cfg = Config {
3076 partition: "test-stale-clear-to-size".into(),
3077 items_per_section: NZU64!(5),
3078 compression: None,
3079 codec_config: (),
3080 write_buffer: NZUsize!(1024),
3081 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
3082 };
3083
3084 let journal = Journal::<deterministic::Context, u64>::init_at_size(
3085 context.child("first"),
3086 cfg.clone(),
3087 9,
3088 )
3089 .await
3090 .expect("Failed to create stale empty journal");
3091 journal.sync().await.unwrap();
3092 drop(journal);
3093
3094 match context.remove(&cfg.data_partition(), None).await {
3097 Ok(()) | Err(commonware_runtime::Error::PartitionMissing(_)) => {}
3098 Err(error) => panic!("failed to clear data partition: {error}"),
3099 }
3100
3101 let lower_bound = 7;
3102 let upper_bound = 20;
3103 let journal = Journal::<deterministic::Context, u64>::init_sync(
3104 context.child("second"),
3105 cfg.clone(),
3106 lower_bound..upper_bound,
3107 )
3108 .await
3109 .expect("Failed to repair stale empty journal");
3110
3111 assert_eq!(journal.size().await, lower_bound);
3112 let bounds = journal.bounds().await;
3113 assert!(bounds.is_empty());
3114 assert_eq!(bounds.start, lower_bound);
3115
3116 journal.destroy().await.unwrap();
3117 });
3118 }
3119
3120 #[test_traced]
3122 fn test_init_sync_existing_data_stale() {
3123 let executor = deterministic::Runner::default();
3124 executor.start(|context| async move {
3125 let items_per_section = NZU64!(5);
3126 let cfg = Config {
3127 partition: "test-stale".into(),
3128 items_per_section,
3129 compression: None,
3130 codec_config: (),
3131 write_buffer: NZUsize!(1024),
3132 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
3133 };
3134
3135 let journal =
3137 Journal::<deterministic::Context, u64>::init(context.child("first"), cfg.clone())
3138 .await
3139 .expect("Failed to create initial journal");
3140
3141 for i in 0..10u64 {
3143 journal.append(&(i * 100)).await.unwrap();
3144 }
3145 journal.sync().await.unwrap();
3146 drop(journal);
3147
3148 let lower_bound = 15; let upper_bound = 26; let journal = Journal::<deterministic::Context, u64>::init_sync(
3152 context.child("second"),
3153 cfg.clone(),
3154 lower_bound..upper_bound,
3155 )
3156 .await
3157 .expect("Failed to initialize journal with stale data");
3158
3159 assert_eq!(journal.size().await, 15);
3160
3161 assert!(journal.bounds().await.is_empty());
3163
3164 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
3166 assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
3167 assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
3168
3169 journal.destroy().await.unwrap();
3170 });
3171 }
3172
3173 #[test_traced]
3175 fn test_init_sync_section_boundaries() {
3176 let executor = deterministic::Runner::default();
3177 executor.start(|context| async move {
3178 let items_per_section = NZU64!(5);
3179 let cfg = Config {
3180 partition: "test-boundaries".into(),
3181 items_per_section,
3182 compression: None,
3183 codec_config: (),
3184 write_buffer: NZUsize!(1024),
3185 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
3186 };
3187
3188 let journal =
3190 Journal::<deterministic::Context, u64>::init(context.child("storage"), cfg.clone())
3191 .await
3192 .expect("Failed to create initial journal");
3193
3194 for i in 0..25u64 {
3196 journal.append(&(i * 100)).await.unwrap();
3197 }
3198 journal.sync().await.unwrap();
3199 drop(journal);
3200
3201 let lower_bound = 15; let upper_bound = 25; let journal = Journal::<deterministic::Context, u64>::init_sync(
3205 context.child("storage"),
3206 cfg.clone(),
3207 lower_bound..upper_bound,
3208 )
3209 .await
3210 .expect("Failed to initialize journal at boundaries");
3211
3212 assert_eq!(journal.size().await, 25);
3213
3214 assert_eq!(journal.bounds().await.start, 15);
3216
3217 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
3219 assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(_))));
3220
3221 assert_eq!(journal.read(15).await.unwrap(), 1500);
3223 assert_eq!(journal.read(20).await.unwrap(), 2000);
3224 assert_eq!(journal.read(24).await.unwrap(), 2400);
3225
3226 assert!(matches!(
3228 journal.read(25).await,
3229 Err(Error::ItemOutOfRange(_))
3230 ));
3231
3232 let pos = journal.append(&999).await.unwrap();
3234 assert_eq!(pos, 25);
3235 assert_eq!(journal.read(25).await.unwrap(), 999);
3236
3237 journal.destroy().await.unwrap();
3238 });
3239 }
3240
3241 #[test_traced]
3243 fn test_init_sync_same_section_bounds() {
3244 let executor = deterministic::Runner::default();
3245 executor.start(|context| async move {
3246 let items_per_section = NZU64!(5);
3247 let cfg = Config {
3248 partition: "test-same-section".into(),
3249 items_per_section,
3250 compression: None,
3251 codec_config: (),
3252 write_buffer: NZUsize!(1024),
3253 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(PAGE_CACHE_SIZE)),
3254 };
3255
3256 let journal =
3258 Journal::<deterministic::Context, u64>::init(context.child("storage"), cfg.clone())
3259 .await
3260 .expect("Failed to create initial journal");
3261
3262 for i in 0..15u64 {
3264 journal.append(&(i * 100)).await.unwrap();
3265 }
3266 journal.sync().await.unwrap();
3267 drop(journal);
3268
3269 let lower_bound = 10; let upper_bound = 15; let journal = Journal::<deterministic::Context, u64>::init_sync(
3273 context.child("storage"),
3274 cfg.clone(),
3275 lower_bound..upper_bound,
3276 )
3277 .await
3278 .expect("Failed to initialize journal with same-section bounds");
3279
3280 assert_eq!(journal.size().await, 15);
3281
3282 assert_eq!(journal.bounds().await.start, 10);
3285
3286 assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(_))));
3288 assert!(matches!(journal.read(9).await, Err(Error::ItemPruned(_))));
3289
3290 assert_eq!(journal.read(10).await.unwrap(), 1000);
3292 assert_eq!(journal.read(11).await.unwrap(), 1100);
3293 assert_eq!(journal.read(14).await.unwrap(), 1400);
3294
3295 assert!(matches!(
3297 journal.read(15).await,
3298 Err(Error::ItemOutOfRange(_))
3299 ));
3300
3301 let pos = journal.append(&999).await.unwrap();
3303 assert_eq!(pos, 15);
3304 assert_eq!(journal.read(15).await.unwrap(), 999);
3305
3306 journal.destroy().await.unwrap();
3307 });
3308 }
3309
3310 #[test_traced]
3315 fn test_single_item_per_section() {
3316 let executor = deterministic::Runner::default();
3317 executor.start(|context| async move {
3318 let cfg = Config {
3319 partition: "single-item-per-section".into(),
3320 items_per_section: NZU64!(1),
3321 compression: None,
3322 codec_config: (),
3323 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
3324 write_buffer: NZUsize!(1024),
3325 };
3326
3327 let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
3329 .await
3330 .unwrap();
3331
3332 let bounds = journal.bounds().await;
3334 assert_eq!(bounds.end, 0);
3335 assert!(bounds.is_empty());
3336
3337 let pos = journal.append(&0).await.unwrap();
3339 assert_eq!(pos, 0);
3340 assert_eq!(journal.size().await, 1);
3341
3342 journal.sync().await.unwrap();
3344
3345 let value = journal.read(journal.size().await - 1).await.unwrap();
3347 assert_eq!(value, 0);
3348
3349 for i in 1..10u64 {
3351 let pos = journal.append(&(i * 100)).await.unwrap();
3352 assert_eq!(pos, i);
3353 assert_eq!(journal.size().await, i + 1);
3354
3355 let value = journal.read(journal.size().await - 1).await.unwrap();
3357 assert_eq!(value, i * 100);
3358 }
3359
3360 for i in 0..10u64 {
3362 assert_eq!(journal.read(i).await.unwrap(), i * 100);
3363 }
3364
3365 journal.sync().await.unwrap();
3366
3367 let pruned = journal.prune(5).await.unwrap();
3370 assert!(pruned);
3371
3372 assert_eq!(journal.size().await, 10);
3374
3375 assert_eq!(journal.bounds().await.start, 5);
3377
3378 let value = journal.read(journal.size().await - 1).await.unwrap();
3380 assert_eq!(value, 900);
3381
3382 for i in 0..5 {
3384 assert!(matches!(
3385 journal.read(i).await,
3386 Err(crate::journal::Error::ItemPruned(_))
3387 ));
3388 }
3389
3390 for i in 5..10u64 {
3392 assert_eq!(journal.read(i).await.unwrap(), i * 100);
3393 }
3394
3395 for i in 10..15u64 {
3397 let pos = journal.append(&(i * 100)).await.unwrap();
3398 assert_eq!(pos, i);
3399
3400 let value = journal.read(journal.size().await - 1).await.unwrap();
3402 assert_eq!(value, i * 100);
3403 }
3404
3405 journal.sync().await.unwrap();
3406 drop(journal);
3407
3408 let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone())
3410 .await
3411 .unwrap();
3412
3413 assert_eq!(journal.size().await, 15);
3415
3416 assert_eq!(journal.bounds().await.start, 5);
3418
3419 let value = journal.read(journal.size().await - 1).await.unwrap();
3421 assert_eq!(value, 1400);
3422
3423 for i in 5..15u64 {
3425 assert_eq!(journal.read(i).await.unwrap(), i * 100);
3426 }
3427
3428 journal.destroy().await.unwrap();
3429
3430 let journal = Journal::<_, u64>::init(context.child("third"), cfg.clone())
3433 .await
3434 .unwrap();
3435
3436 for i in 0..10u64 {
3438 journal.append(&(i * 1000)).await.unwrap();
3439 }
3440
3441 journal.prune(5).await.unwrap();
3443 let bounds = journal.bounds().await;
3444 assert_eq!(bounds.end, 10);
3445 assert_eq!(bounds.start, 5);
3446
3447 journal.sync().await.unwrap();
3449 drop(journal);
3450
3451 let journal = Journal::<_, u64>::init(context.child("fourth"), cfg.clone())
3453 .await
3454 .unwrap();
3455
3456 let bounds = journal.bounds().await;
3458 assert_eq!(bounds.end, 10);
3459 assert_eq!(bounds.start, 5);
3460
3461 let value = journal.read(journal.size().await - 1).await.unwrap();
3463 assert_eq!(value, 9000);
3464
3465 for i in 5..10u64 {
3467 assert_eq!(journal.read(i).await.unwrap(), i * 1000);
3468 }
3469
3470 journal.destroy().await.unwrap();
3471
3472 let journal = Journal::<_, u64>::init(context.child("fifth"), cfg.clone())
3476 .await
3477 .unwrap();
3478
3479 for i in 0..5u64 {
3480 journal.append(&(i * 100)).await.unwrap();
3481 }
3482 journal.sync().await.unwrap();
3483
3484 journal.prune(5).await.unwrap();
3486 let bounds = journal.bounds().await;
3487 assert_eq!(bounds.end, 5); assert!(bounds.is_empty()); let result = journal.read(journal.size().await - 1).await;
3492 assert!(matches!(result, Err(crate::journal::Error::ItemPruned(4))));
3493
3494 journal.append(&500).await.unwrap();
3496 let bounds = journal.bounds().await;
3497 assert_eq!(bounds.start, 5);
3498 assert_eq!(journal.read(bounds.end - 1).await.unwrap(), 500);
3499
3500 journal.destroy().await.unwrap();
3501 });
3502 }
3503
3504 #[test_traced]
3505 fn test_variable_journal_clear_to_size() {
3506 let executor = deterministic::Runner::default();
3507 executor.start(|context| async move {
3508 let cfg = Config {
3509 partition: "clear-test".into(),
3510 items_per_section: NZU64!(10),
3511 compression: None,
3512 codec_config: (),
3513 page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
3514 write_buffer: NZUsize!(1024),
3515 };
3516
3517 let journal = Journal::<_, u64>::init(context.child("journal"), cfg.clone())
3518 .await
3519 .unwrap();
3520
3521 for i in 0..25u64 {
3523 journal.append(&(i * 100)).await.unwrap();
3524 }
3525 let bounds = journal.bounds().await;
3526 assert_eq!(bounds.end, 25);
3527 assert_eq!(bounds.start, 0);
3528 journal.sync().await.unwrap();
3529
3530 journal.clear_to_size(100).await.unwrap();
3532 let bounds = journal.bounds().await;
3533 assert_eq!(bounds.end, 100);
3534 assert!(bounds.is_empty());
3535
3536 for i in 0..25 {
3538 assert!(matches!(
3539 journal.read(i).await,
3540 Err(crate::journal::Error::ItemPruned(_))
3541 ));
3542 }
3543
3544 drop(journal);
3546 let journal =
3547 Journal::<_, u64>::init(context.child("journal_after_clear"), cfg.clone())
3548 .await
3549 .unwrap();
3550 let bounds = journal.bounds().await;
3551 assert_eq!(bounds.end, 100);
3552 assert!(bounds.is_empty());
3553
3554 for i in 100..105u64 {
3556 let pos = journal.append(&(i * 100)).await.unwrap();
3557 assert_eq!(pos, i);
3558 }
3559 let bounds = journal.bounds().await;
3560 assert_eq!(bounds.end, 105);
3561 assert_eq!(bounds.start, 100);
3562
3563 for i in 100..105u64 {
3565 assert_eq!(journal.read(i).await.unwrap(), i * 100);
3566 }
3567
3568 journal.sync().await.unwrap();
3570 drop(journal);
3571
3572 let journal = Journal::<_, u64>::init(context.child("journal_reopened"), cfg)
3573 .await
3574 .unwrap();
3575
3576 let bounds = journal.bounds().await;
3577 assert_eq!(bounds.end, 105);
3578 assert_eq!(bounds.start, 100);
3579 for i in 100..105u64 {
3580 assert_eq!(journal.read(i).await.unwrap(), i * 100);
3581 }
3582
3583 journal.destroy().await.unwrap();
3584 });
3585 }
3586
3587 #[test_traced]
3588 fn test_variable_journal_metrics() {
3589 let executor = deterministic::Runner::default();
3590 executor.start(|context| async move {
3591 let cfg = Config {
3592 partition: "metrics".into(),
3593 items_per_section: NZU64!(2),
3594 compression: None,
3595 codec_config: (),
3596 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(10)),
3597 write_buffer: NZUsize!(1024),
3598 };
3599 let journal = Journal::<_, u64>::init(context.child("variable_metrics"), cfg)
3600 .await
3601 .unwrap();
3602
3603 let items = [0, 1, 2, 3, 4];
3604 journal.append_many(Many::Flat(&items)).await.unwrap();
3605 journal.append(&5).await.unwrap();
3606 let reader = journal.reader().await;
3607 reader.read(0).await.unwrap();
3608 reader.read_many(&[1, 2]).await.unwrap();
3609 reader.try_read_sync(3).unwrap();
3610 drop(reader);
3611 journal.commit().await.unwrap();
3612 journal.sync().await.unwrap();
3613 journal.prune(2).await.unwrap();
3614 journal.rewind(4).await.unwrap();
3615
3616 let buffer = context.encode();
3617 for expected in [
3618 "variable_metrics_size 4",
3619 "variable_metrics_pruning_boundary 2",
3620 "variable_metrics_retained 2",
3621 "variable_metrics_tail_items 2",
3622 "variable_metrics_append_calls_total 1",
3623 "variable_metrics_append_many_calls_total 1",
3624 "variable_metrics_read_calls_total 1",
3625 "variable_metrics_read_many_calls_total 1",
3626 "variable_metrics_try_read_sync_hits_total 1",
3627 "variable_metrics_items_read_total 4",
3628 "variable_metrics_commit_calls_total 1",
3629 "variable_metrics_sync_calls_total 1",
3630 "variable_metrics_append_duration_count 1",
3631 "variable_metrics_append_many_duration_count 1",
3632 "variable_metrics_read_duration_count 1",
3633 "variable_metrics_read_many_duration_count 1",
3634 "variable_metrics_commit_duration_count 1",
3635 "variable_metrics_sync_duration_count 1",
3636 "variable_metrics_data_tracked",
3637 "variable_metrics_offsets_size 4",
3638 "variable_metrics_offsets_blobs_tracked",
3639 ] {
3640 assert!(buffer.contains(expected), "{expected}\n{buffer}");
3641 }
3642 for unexpected in [
3643 "variable_metrics_cache_hits_total",
3644 "variable_metrics_cache_misses_total",
3645 ] {
3646 assert!(!buffer.contains(unexpected), "{unexpected}\n{buffer}");
3647 }
3648
3649 journal.destroy().await.unwrap();
3650 });
3651 }
3652}