1use crate::{
57 journal::{
58 contiguous::MutableContiguous,
59 segmented::fixed::{Config as SegmentedConfig, Journal as SegmentedJournal},
60 Error,
61 },
62 metadata::{Config as MetadataConfig, Metadata},
63 Persistable,
64};
65use commonware_codec::CodecFixedShared;
66use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
67use futures::{stream::Stream, StreamExt};
68use std::num::{NonZeroU64, NonZeroUsize};
69use tracing::warn;
70
71const PRUNING_BOUNDARY_KEY: u64 = 1;
73
74#[derive(Clone)]
76pub struct Config {
77 pub partition: String,
82
83 pub items_per_blob: NonZeroU64,
88
89 pub page_cache: CacheRef,
91
92 pub write_buffer: NonZeroUsize,
94}
95
96pub struct Journal<E: Clock + Storage + Metrics, A: CodecFixedShared> {
111 inner: SegmentedJournal<E, A>,
112
113 items_per_blob: u64,
115
116 size: u64,
118
119 metadata: Metadata<E, u64, Vec<u8>>,
127
128 pruning_boundary: u64,
130}
131
132impl<E: Clock + Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
133 pub const CHUNK_SIZE: usize = SegmentedJournal::<E, A>::CHUNK_SIZE;
135
136 pub const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
138
139 async fn scan_partition(context: &E, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
141 match context.scan(partition).await {
142 Ok(blobs) => Ok(blobs),
143 Err(commonware_runtime::Error::PartitionMissing(_)) => Ok(Vec::new()),
144 Err(err) => Err(Error::Runtime(err)),
145 }
146 }
147
148 async fn select_blob_partition(context: &E, cfg: &Config) -> Result<String, Error> {
154 let legacy_partition = cfg.partition.as_str();
155 let new_partition = format!("{}-blobs", cfg.partition);
156
157 let legacy_blobs = Self::scan_partition(context, legacy_partition).await?;
158 let new_blobs = Self::scan_partition(context, &new_partition).await?;
159
160 if !legacy_blobs.is_empty() && !new_blobs.is_empty() {
161 return Err(Error::Corruption(format!(
162 "both legacy and blobs partitions contain data: legacy={} blobs={}",
163 legacy_partition, new_partition
164 )));
165 }
166
167 if !legacy_blobs.is_empty() {
168 Ok(legacy_partition.to_string())
169 } else {
170 Ok(new_partition)
171 }
172 }
173
174 pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
179 let items_per_blob = cfg.items_per_blob.get();
180
181 let blob_partition = Self::select_blob_partition(&context, &cfg).await?;
182 let segmented_cfg = SegmentedConfig {
183 partition: blob_partition,
184 page_cache: cfg.page_cache,
185 write_buffer: cfg.write_buffer,
186 };
187
188 let mut inner = SegmentedJournal::init(context.with_label("blobs"), segmented_cfg).await?;
189
190 let meta_cfg = MetadataConfig {
192 partition: format!("{}-metadata", cfg.partition),
193 codec_config: ((0..).into(), ()),
194 };
195 let mut metadata =
196 Metadata::<_, u64, Vec<u8>>::init(context.with_label("meta"), meta_cfg).await?;
197
198 let meta_pruning_boundary = match metadata.get(&PRUNING_BOUNDARY_KEY) {
200 Some(bytes) => Some(u64::from_be_bytes(bytes.as_slice().try_into().map_err(
201 |_| Error::Corruption("invalid pruning_boundary metadata".into()),
202 )?)),
203 None => None,
204 };
205
206 let (pruning_boundary, size, needs_metadata_update) =
208 Self::recover_bounds(&inner, items_per_blob, meta_pruning_boundary).await?;
209
210 if needs_metadata_update {
212 if pruning_boundary.is_multiple_of(items_per_blob) {
213 metadata.remove(&PRUNING_BOUNDARY_KEY);
214 } else {
215 metadata.put(
216 PRUNING_BOUNDARY_KEY,
217 pruning_boundary.to_be_bytes().to_vec(),
218 );
219 }
220 metadata.sync().await?;
221 }
222
223 let tail_section = size / items_per_blob;
227 inner.ensure_section_exists(tail_section).await?;
228
229 Ok(Self {
230 inner,
231 items_per_blob,
232 size,
233 pruning_boundary,
234 metadata,
235 })
236 }
237
238 async fn recover_bounds(
249 inner: &SegmentedJournal<E, A>,
250 items_per_blob: u64,
251 meta_pruning_boundary: Option<u64>,
252 ) -> Result<(u64, u64, bool), Error> {
253 let blob_boundary = inner.oldest_section().map_or(0, |o| o * items_per_blob);
255
256 let (pruning_boundary, needs_update) = match meta_pruning_boundary {
257 Some(meta_pruning_boundary)
259 if !meta_pruning_boundary.is_multiple_of(items_per_blob) =>
260 {
261 let meta_oldest_section = meta_pruning_boundary / items_per_blob;
262 match inner.oldest_section() {
263 None => {
264 warn!(
268 meta_oldest_section,
269 "crash repair: no blobs exist, ignoring stale metadata"
270 );
271 (blob_boundary, true)
272 }
273 Some(oldest_section) if meta_oldest_section < oldest_section => {
274 warn!(
275 meta_oldest_section,
276 oldest_section, "crash repair: metadata stale, computing from blobs"
277 );
278 (blob_boundary, true)
279 }
280 Some(oldest_section) if meta_oldest_section > oldest_section => {
281 warn!(
285 meta_oldest_section,
286 oldest_section,
287 "crash repair: metadata ahead of blobs, computing from blobs"
288 );
289 (blob_boundary, true)
290 }
291 Some(_) => (meta_pruning_boundary, false), }
293 }
294 Some(_) => (blob_boundary, true),
296 None => (blob_boundary, false),
298 };
299
300 Self::validate_oldest_section(inner, items_per_blob, pruning_boundary).await?;
302
303 let size = Self::compute_size(inner, items_per_blob, pruning_boundary).await?;
304 Ok((pruning_boundary, size, needs_update))
305 }
306
307 async fn validate_oldest_section(
312 inner: &SegmentedJournal<E, A>,
313 items_per_blob: u64,
314 pruning_boundary: u64,
315 ) -> Result<(), Error> {
316 let (Some(oldest), Some(newest)) = (inner.oldest_section(), inner.newest_section()) else {
317 return Ok(()); };
319
320 if oldest == newest {
321 return Ok(()); }
323
324 let oldest_len = inner.section_len(oldest).await?;
325 let oldest_start = oldest * items_per_blob;
326
327 let expected = if pruning_boundary > oldest_start {
328 items_per_blob - (pruning_boundary - oldest_start)
330 } else {
331 items_per_blob
333 };
334
335 if oldest_len != expected {
336 return Err(Error::Corruption(format!(
337 "oldest section {oldest} has wrong size: expected {expected} items, got {oldest_len}"
338 )));
339 }
340
341 Ok(())
342 }
343
344 async fn compute_size(
346 inner: &SegmentedJournal<E, A>,
347 items_per_blob: u64,
348 pruning_boundary: u64,
349 ) -> Result<u64, Error> {
350 let oldest = inner.oldest_section();
351 let newest = inner.newest_section();
352
353 let (Some(oldest), Some(newest)) = (oldest, newest) else {
354 return Ok(pruning_boundary);
355 };
356
357 if oldest == newest {
358 let tail_len = inner.section_len(newest).await?;
360 return Ok(pruning_boundary + tail_len);
361 }
362
363 let oldest_len = inner.section_len(oldest).await?;
365 let tail_len = inner.section_len(newest).await?;
366
367 let middle_sections = newest - oldest - 1;
369 let middle_items = middle_sections * items_per_blob;
370
371 Ok(pruning_boundary + oldest_len + middle_items + tail_len)
372 }
373
374 #[commonware_macros::stability(ALPHA)]
399 pub async fn init_at_size(context: E, cfg: Config, size: u64) -> Result<Self, Error> {
400 let items_per_blob = cfg.items_per_blob.get();
401 let tail_section = size / items_per_blob;
402
403 let blob_partition = Self::select_blob_partition(&context, &cfg).await?;
404 let segmented_cfg = SegmentedConfig {
405 partition: blob_partition,
406 page_cache: cfg.page_cache,
407 write_buffer: cfg.write_buffer,
408 };
409
410 let meta_cfg = MetadataConfig {
412 partition: format!("{}-metadata", cfg.partition),
413 codec_config: ((0..).into(), ()),
414 };
415 let mut metadata =
416 Metadata::<_, u64, Vec<u8>>::init(context.with_label("meta"), meta_cfg).await?;
417 let mut inner = SegmentedJournal::init(context.with_label("blobs"), segmented_cfg).await?;
418
419 inner.clear().await?;
425 inner.ensure_section_exists(tail_section).await?;
426
427 if !size.is_multiple_of(items_per_blob) {
429 metadata.put(PRUNING_BOUNDARY_KEY, size.to_be_bytes().to_vec());
430 metadata.sync().await?;
431 } else if metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
432 metadata.remove(&PRUNING_BOUNDARY_KEY);
433 metadata.sync().await?;
434 }
435
436 Ok(Self {
437 inner,
438 items_per_blob,
439 size,
440 pruning_boundary: size, metadata,
442 })
443 }
444
445 #[inline]
447 const fn position_to_section(&self, position: u64) -> (u64, u64) {
448 let section = position / self.items_per_blob;
449 let pos_in_section = position % self.items_per_blob;
450 (section, pos_in_section)
451 }
452
453 pub async fn sync(&mut self) -> Result<(), Error> {
458 let tail_section = self.size / self.items_per_blob;
460 self.inner.sync(tail_section).await?;
461
462 if !self.pruning_boundary.is_multiple_of(self.items_per_blob) {
464 let needs_update = self
465 .metadata
466 .get(&PRUNING_BOUNDARY_KEY)
467 .is_none_or(|bytes| bytes.as_slice() != self.pruning_boundary.to_be_bytes());
468 if needs_update {
469 self.metadata.put(
470 PRUNING_BOUNDARY_KEY,
471 self.pruning_boundary.to_be_bytes().to_vec(),
472 );
473 self.metadata.sync().await?;
474 }
475 } else if self.metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
476 self.metadata.remove(&PRUNING_BOUNDARY_KEY);
477 self.metadata.sync().await?;
478 }
479
480 Ok(())
481 }
482
483 pub const fn size(&self) -> u64 {
486 self.size
487 }
488
489 pub const fn bounds(&self) -> std::ops::Range<u64> {
492 self.pruning_boundary..self.size
493 }
494
495 pub async fn append(&mut self, item: A) -> Result<u64, Error> {
498 let position = self.size;
499 let (section, _pos_in_section) = self.position_to_section(position);
500
501 self.inner.append(section, item).await?;
502 self.size += 1;
503
504 if self.size.is_multiple_of(self.items_per_blob) {
507 self.inner.sync(section).await?;
508 self.inner.ensure_section_exists(section + 1).await?;
510 }
511
512 Ok(position)
513 }
514
515 pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
524 match size.cmp(&self.size) {
525 std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
526 std::cmp::Ordering::Equal => return Ok(()),
527 std::cmp::Ordering::Less => {}
528 }
529
530 if size < self.pruning_boundary {
531 return Err(Error::InvalidRewind(size));
532 }
533
534 let section = size / self.items_per_blob;
535 let section_start = section * self.items_per_blob;
536
537 let first_in_section = self.pruning_boundary.max(section_start);
539 let pos_in_section = size - first_in_section;
540 let byte_offset = pos_in_section * Self::CHUNK_SIZE_U64;
541
542 self.inner.rewind(section, byte_offset).await?;
543 self.size = size;
544
545 Ok(())
546 }
547
548 pub async fn read(&self, pos: u64) -> Result<A, Error> {
555 let bounds = self.bounds();
556 if pos >= bounds.end {
557 return Err(Error::ItemOutOfRange(pos));
558 }
559 if pos < bounds.start {
560 return Err(Error::ItemPruned(pos));
561 }
562
563 let section = pos / self.items_per_blob;
564 let section_start = section * self.items_per_blob;
565
566 let first_in_section = bounds.start.max(section_start);
569 let pos_in_section = pos - first_in_section;
570
571 self.inner.get(section, pos_in_section).await.map_err(|e| {
572 match e {
574 Error::SectionOutOfRange(e)
575 | Error::AlreadyPrunedToSection(e)
576 | Error::ItemOutOfRange(e) => {
577 Error::Corruption(format!("section/item should be found, but got: {e}"))
578 }
579 other => other,
580 }
581 })
582 }
583
584 pub async fn replay(
592 &self,
593 buffer: NonZeroUsize,
594 start_pos: u64,
595 ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + '_, Error> {
596 let bounds = self.bounds();
597 if start_pos > bounds.end {
598 return Err(Error::ItemOutOfRange(start_pos));
599 }
600 if start_pos < bounds.start {
601 return Err(Error::ItemPruned(start_pos));
602 }
603
604 let start_section = start_pos / self.items_per_blob;
605 let section_start = start_section * self.items_per_blob;
606
607 let first_in_section = bounds.start.max(section_start);
609 let start_pos_in_section = start_pos - first_in_section;
610
611 let items_per_blob = self.items_per_blob;
612 let pruning_boundary = bounds.start;
613
614 if let (Some(oldest), Some(newest)) =
618 (self.inner.oldest_section(), self.inner.newest_section())
619 {
620 let first_to_check = start_section.max(oldest + 1);
622 for section in first_to_check..newest {
623 let len = self.inner.section_len(section).await?;
624 if len < items_per_blob {
625 return Err(Error::Corruption(format!(
626 "section {section} incomplete: expected {items_per_blob} items, got {len}"
627 )));
628 }
629 }
630 }
631
632 let inner_stream = self
633 .inner
634 .replay(start_section, start_pos_in_section, buffer)
635 .await?;
636
637 let stream = inner_stream.map(move |result| {
639 result.map(|(section, pos_in_section, item)| {
640 let section_start = section * items_per_blob;
641 let first_in_section = pruning_boundary.max(section_start);
642 let global_pos = first_in_section + pos_in_section;
643 (global_pos, item)
644 })
645 });
646
647 Ok(stream)
648 }
649
650 pub async fn prune(&mut self, min_item_pos: u64) -> Result<bool, Error> {
657 let target_section = min_item_pos / self.items_per_blob;
659
660 let tail_section = self.size / self.items_per_blob;
662
663 let min_section = std::cmp::min(target_section, tail_section);
665
666 let pruned = self.inner.prune(min_section).await?;
667
668 if pruned {
670 let new_oldest = self
671 .inner
672 .oldest_section()
673 .expect("all sections pruned - violates tail section invariant");
674 assert!(self.pruning_boundary < new_oldest * self.items_per_blob);
676 self.pruning_boundary = new_oldest * self.items_per_blob;
677 }
678
679 Ok(pruned)
680 }
681
682 pub async fn destroy(self) -> Result<(), Error> {
684 self.inner.destroy().await?;
686
687 self.metadata.destroy().await?;
689
690 Ok(())
691 }
692
693 pub(crate) async fn clear_to_size(&mut self, new_size: u64) -> Result<(), Error> {
702 self.inner.clear().await?;
708 let tail_section = new_size / self.items_per_blob;
709 self.inner.ensure_section_exists(tail_section).await?;
710
711 self.size = new_size;
712 self.pruning_boundary = new_size; if !self.pruning_boundary.is_multiple_of(self.items_per_blob) {
716 self.metadata.put(
717 PRUNING_BOUNDARY_KEY,
718 self.pruning_boundary.to_be_bytes().to_vec(),
719 );
720 self.metadata.sync().await?;
721 } else if self.metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
722 self.metadata.remove(&PRUNING_BOUNDARY_KEY);
723 self.metadata.sync().await?;
724 }
725
726 Ok(())
727 }
728}
729
730impl<E: Clock + Storage + Metrics, A: CodecFixedShared> super::Contiguous for Journal<E, A> {
732 type Item = A;
733
734 fn bounds(&self) -> std::ops::Range<u64> {
735 Self::bounds(self)
736 }
737
738 async fn replay(
739 &self,
740 start_pos: u64,
741 buffer: NonZeroUsize,
742 ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error> {
743 Self::replay(self, buffer, start_pos).await
744 }
745
746 async fn read(&self, position: u64) -> Result<Self::Item, Error> {
747 Self::read(self, position).await
748 }
749}
750
751impl<E: Clock + Storage + Metrics, A: CodecFixedShared> MutableContiguous for Journal<E, A> {
752 async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {
753 Self::append(self, item).await
754 }
755
756 async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
757 Self::prune(self, min_position).await
758 }
759
760 async fn rewind(&mut self, size: u64) -> Result<(), Error> {
761 Self::rewind(self, size).await
762 }
763}
764
765impl<E: Clock + Storage + Metrics, A: CodecFixedShared> Persistable for Journal<E, A> {
766 type Error = Error;
767
768 async fn commit(&mut self) -> Result<(), Error> {
769 Self::sync(self).await
770 }
771
772 async fn sync(&mut self) -> Result<(), Error> {
773 Self::sync(self).await
774 }
775
776 async fn destroy(self) -> Result<(), Error> {
777 Self::destroy(self).await
778 }
779}
780
781#[cfg(test)]
782mod tests {
783 use super::*;
784 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
785 use commonware_macros::test_traced;
786 use commonware_runtime::{
787 deterministic::{self, Context},
788 Blob, Error as RuntimeError, Metrics, Runner, Storage,
789 };
790 use commonware_utils::{NZUsize, NZU16, NZU64};
791 use futures::{pin_mut, StreamExt};
792 use std::num::NonZeroU16;
793
794 const PAGE_SIZE: NonZeroU16 = NZU16!(44);
795 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
796
797 fn test_digest(value: u64) -> Digest {
799 Sha256::hash(&value.to_be_bytes())
800 }
801
802 fn test_cfg(items_per_blob: NonZeroU64) -> Config {
803 Config {
804 partition: "test_partition".into(),
805 items_per_blob,
806 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
807 write_buffer: NZUsize!(2048),
808 }
809 }
810
811 fn blob_partition(cfg: &Config) -> String {
812 format!("{}-blobs", cfg.partition)
813 }
814
815 async fn scan_partition(context: &Context, partition: &str) -> Vec<Vec<u8>> {
816 match context.scan(partition).await {
817 Ok(blobs) => blobs,
818 Err(RuntimeError::PartitionMissing(_)) => Vec::new(),
819 Err(err) => panic!("Failed to scan partition {partition}: {err}"),
820 }
821 }
822
823 #[test_traced]
824 fn test_fixed_journal_init_conflicting_partitions() {
825 let executor = deterministic::Runner::default();
826 executor.start(|context| async move {
827 let cfg = test_cfg(NZU64!(2));
828 let legacy_partition = cfg.partition.clone();
829 let blobs_partition = blob_partition(&cfg);
830
831 let (legacy_blob, _) = context
832 .open(&legacy_partition, &0u64.to_be_bytes())
833 .await
834 .expect("Failed to open legacy blob");
835 legacy_blob
836 .write_at(0, vec![0u8; 1])
837 .await
838 .expect("Failed to write legacy blob");
839 legacy_blob
840 .sync()
841 .await
842 .expect("Failed to sync legacy blob");
843
844 let (new_blob, _) = context
845 .open(&blobs_partition, &0u64.to_be_bytes())
846 .await
847 .expect("Failed to open new blob");
848 new_blob
849 .write_at(0, vec![0u8; 1])
850 .await
851 .expect("Failed to write new blob");
852 new_blob.sync().await.expect("Failed to sync new blob");
853
854 let result =
855 Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
856 assert!(matches!(result, Err(Error::Corruption(_))));
857 });
858 }
859
860 #[test_traced]
861 fn test_fixed_journal_init_prefers_legacy_partition() {
862 let executor = deterministic::Runner::default();
863 executor.start(|context| async move {
864 let cfg = test_cfg(NZU64!(2));
865 let legacy_partition = cfg.partition.clone();
866 let blobs_partition = blob_partition(&cfg);
867
868 let (legacy_blob, _) = context
870 .open(&legacy_partition, &0u64.to_be_bytes())
871 .await
872 .expect("Failed to open legacy blob");
873 legacy_blob
874 .write_at(0, vec![0u8; 1])
875 .await
876 .expect("Failed to write legacy blob");
877 legacy_blob
878 .sync()
879 .await
880 .expect("Failed to sync legacy blob");
881
882 let mut journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
883 .await
884 .expect("failed to initialize journal");
885 journal.append(test_digest(1)).await.unwrap();
886 journal.sync().await.unwrap();
887 drop(journal);
888
889 let legacy_blobs = scan_partition(&context, &legacy_partition).await;
890 let new_blobs = scan_partition(&context, &blobs_partition).await;
891 assert!(!legacy_blobs.is_empty());
892 assert!(new_blobs.is_empty());
893 });
894 }
895
896 #[test_traced]
897 fn test_fixed_journal_init_defaults_to_blobs_partition() {
898 let executor = deterministic::Runner::default();
899 executor.start(|context| async move {
900 let cfg = test_cfg(NZU64!(2));
901 let legacy_partition = cfg.partition.clone();
902 let blobs_partition = blob_partition(&cfg);
903
904 let mut journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
905 .await
906 .expect("failed to initialize journal");
907 journal.append(test_digest(1)).await.unwrap();
908 journal.sync().await.unwrap();
909 drop(journal);
910
911 let legacy_blobs = scan_partition(&context, &legacy_partition).await;
912 let new_blobs = scan_partition(&context, &blobs_partition).await;
913 assert!(legacy_blobs.is_empty());
914 assert!(!new_blobs.is_empty());
915 });
916 }
917
918 #[test_traced]
919 fn test_fixed_journal_append_and_prune() {
920 let executor = deterministic::Runner::default();
922
923 executor.start(|context| async move {
925 let cfg = test_cfg(NZU64!(2));
927 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
928 .await
929 .expect("failed to initialize journal");
930
931 let mut pos = journal
933 .append(test_digest(0))
934 .await
935 .expect("failed to append data 0");
936 assert_eq!(pos, 0);
937
938 journal.sync().await.expect("Failed to sync journal");
940 drop(journal);
941
942 let cfg = test_cfg(NZU64!(2));
943 let mut journal = Journal::init(context.with_label("second"), cfg.clone())
944 .await
945 .expect("failed to re-initialize journal");
946 assert_eq!(journal.size(), 1);
947
948 pos = journal
950 .append(test_digest(1))
951 .await
952 .expect("failed to append data 1");
953 assert_eq!(pos, 1);
954 pos = journal
955 .append(test_digest(2))
956 .await
957 .expect("failed to append data 2");
958 assert_eq!(pos, 2);
959
960 let item0 = journal.read(0).await.expect("failed to read data 0");
962 assert_eq!(item0, test_digest(0));
963 let item1 = journal.read(1).await.expect("failed to read data 1");
964 assert_eq!(item1, test_digest(1));
965 let item2 = journal.read(2).await.expect("failed to read data 2");
966 assert_eq!(item2, test_digest(2));
967 let err = journal.read(3).await.expect_err("expected read to fail");
968 assert!(matches!(err, Error::ItemOutOfRange(3)));
969
970 journal.sync().await.expect("failed to sync journal");
972
973 journal.prune(1).await.expect("failed to prune journal 1");
975
976 journal.prune(2).await.expect("failed to prune journal 2");
978 assert_eq!(journal.bounds().start, 2);
979
980 let result0 = journal.read(0).await;
982 assert!(matches!(result0, Err(Error::ItemPruned(0))));
983 let result1 = journal.read(1).await;
984 assert!(matches!(result1, Err(Error::ItemPruned(1))));
985
986 let result2 = journal.read(2).await.unwrap();
988 assert_eq!(result2, test_digest(2));
989
990 for i in 3..10 {
992 let pos = journal
993 .append(test_digest(i))
994 .await
995 .expect("failed to append data");
996 assert_eq!(pos, i);
997 }
998
999 journal.prune(0).await.expect("no-op pruning failed");
1001 assert_eq!(journal.inner.oldest_section(), Some(1));
1002 assert_eq!(journal.inner.newest_section(), Some(5));
1003 assert_eq!(journal.bounds().start, 2);
1004
1005 journal
1007 .prune(3 * cfg.items_per_blob.get())
1008 .await
1009 .expect("failed to prune journal 2");
1010 assert_eq!(journal.inner.oldest_section(), Some(3));
1011 assert_eq!(journal.inner.newest_section(), Some(5));
1012 assert_eq!(journal.bounds().start, 6);
1013
1014 journal
1016 .prune(10000)
1017 .await
1018 .expect("failed to max-prune journal");
1019 let size = journal.size();
1020 assert_eq!(size, 10);
1021 assert_eq!(journal.inner.oldest_section(), Some(5));
1022 assert_eq!(journal.inner.newest_section(), Some(5));
1023 assert!(journal.bounds().is_empty());
1026 assert_eq!(journal.bounds().start, size);
1028
1029 {
1031 let result = journal.replay(NZUsize!(1024), 0).await;
1032 assert!(matches!(result, Err(Error::ItemPruned(0))));
1033 }
1034
1035 {
1037 let stream = journal
1038 .replay(NZUsize!(1024), journal.bounds().start)
1039 .await
1040 .expect("failed to replay journal from pruning boundary");
1041 pin_mut!(stream);
1042 let mut items = Vec::new();
1043 while let Some(result) = stream.next().await {
1044 match result {
1045 Ok((pos, item)) => {
1046 assert_eq!(test_digest(pos), item);
1047 items.push(pos);
1048 }
1049 Err(err) => panic!("Failed to read item: {err}"),
1050 }
1051 }
1052 assert_eq!(items, Vec::<u64>::new());
1053 }
1054
1055 journal.destroy().await.unwrap();
1056 });
1057 }
1058
1059 #[test_traced]
1061 fn test_fixed_journal_append_a_lot_of_data() {
1062 let executor = deterministic::Runner::default();
1064 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10000);
1065 executor.start(|context| async move {
1066 let cfg = test_cfg(ITEMS_PER_BLOB);
1067 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1068 .await
1069 .expect("failed to initialize journal");
1070 for i in 0u64..ITEMS_PER_BLOB.get() * 2 - 1 {
1072 journal
1073 .append(test_digest(i))
1074 .await
1075 .expect("failed to append data");
1076 }
1077 journal.sync().await.expect("failed to sync journal");
1079 drop(journal);
1080 let journal = Journal::init(context.with_label("second"), cfg.clone())
1081 .await
1082 .expect("failed to re-initialize journal");
1083 for i in 0u64..10000 {
1084 let item: Digest = journal.read(i).await.expect("failed to read data");
1085 assert_eq!(item, test_digest(i));
1086 }
1087 journal.destroy().await.expect("failed to destroy journal");
1088 });
1089 }
1090
1091 #[test_traced]
1092 fn test_fixed_journal_replay() {
1093 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1094 let executor = deterministic::Runner::default();
1096
1097 executor.start(|context| async move {
1099 let cfg = test_cfg(ITEMS_PER_BLOB);
1101 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1102 .await
1103 .expect("failed to initialize journal");
1104
1105 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1107 let pos = journal
1108 .append(test_digest(i))
1109 .await
1110 .expect("failed to append data");
1111 assert_eq!(pos, i);
1112 }
1113
1114 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1116 let item: Digest = journal.read(i).await.expect("failed to read data");
1117 assert_eq!(item, test_digest(i), "i={i}");
1118 }
1119
1120 {
1122 let stream = journal
1123 .replay(NZUsize!(1024), 0)
1124 .await
1125 .expect("failed to replay journal");
1126 let mut items = Vec::new();
1127 pin_mut!(stream);
1128 while let Some(result) = stream.next().await {
1129 match result {
1130 Ok((pos, item)) => {
1131 assert_eq!(test_digest(pos), item, "pos={pos}, item={item:?}");
1132 items.push(pos);
1133 }
1134 Err(err) => panic!("Failed to read item: {err}"),
1135 }
1136 }
1137
1138 assert_eq!(
1140 items.len(),
1141 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1142 );
1143 items.sort();
1144 for (i, pos) in items.iter().enumerate() {
1145 assert_eq!(i as u64, *pos);
1146 }
1147 }
1148
1149 journal.sync().await.expect("Failed to sync journal");
1150 drop(journal);
1151
1152 let (blob, _) = context
1154 .open(&blob_partition(&cfg), &40u64.to_be_bytes())
1155 .await
1156 .expect("Failed to open blob");
1157 let bad_bytes = 123456789u32;
1159 blob.write_at(1, bad_bytes.to_be_bytes().to_vec())
1160 .await
1161 .expect("Failed to write bad bytes");
1162 blob.sync().await.expect("Failed to sync blob");
1163
1164 let journal = Journal::init(context.with_label("second"), cfg.clone())
1166 .await
1167 .expect("Failed to re-initialize journal");
1168
1169 let err = journal
1171 .read(40 * ITEMS_PER_BLOB.get() + 1)
1172 .await
1173 .unwrap_err();
1174 assert!(matches!(err, Error::Runtime(_)));
1175
1176 {
1178 let mut error_found = false;
1179 let stream = journal
1180 .replay(NZUsize!(1024), 0)
1181 .await
1182 .expect("failed to replay journal");
1183 let mut items = Vec::new();
1184 pin_mut!(stream);
1185 while let Some(result) = stream.next().await {
1186 match result {
1187 Ok((pos, item)) => {
1188 assert_eq!(test_digest(pos), item);
1189 items.push(pos);
1190 }
1191 Err(err) => {
1192 error_found = true;
1193 assert!(matches!(err, Error::Runtime(_)));
1194 break;
1195 }
1196 }
1197 }
1198 assert!(error_found); }
1200 });
1201 }
1202
1203 #[test_traced]
1204 fn test_fixed_journal_init_with_corrupted_historical_blobs() {
1205 let executor = deterministic::Runner::default();
1207 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1209 executor.start(|context| async move {
1210 let cfg = test_cfg(ITEMS_PER_BLOB);
1212 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1213 .await
1214 .expect("failed to initialize journal");
1215
1216 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1218 let pos = journal
1219 .append(test_digest(i))
1220 .await
1221 .expect("failed to append data");
1222 assert_eq!(pos, i);
1223 }
1224 journal.sync().await.expect("Failed to sync journal");
1225 drop(journal);
1226
1227 let (blob, size) = context
1232 .open(&blob_partition(&cfg), &40u64.to_be_bytes())
1233 .await
1234 .expect("Failed to open blob");
1235 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1236 blob.sync().await.expect("Failed to sync blob");
1237
1238 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1241 .await
1242 .expect("failed to initialize journal");
1243
1244 let expected_size = ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2;
1247 assert_eq!(journal.size(), expected_size);
1248
1249 match journal.replay(NZUsize!(1024), 0).await {
1251 Err(Error::Corruption(msg)) => {
1252 assert!(
1253 msg.contains("section 40"),
1254 "Error should mention section 40, got: {msg}"
1255 );
1256 }
1257 Err(e) => panic!("Expected Corruption error for section 40, got: {:?}", e),
1258 Ok(_) => panic!("Expected replay to fail with corruption"),
1259 };
1260 });
1261 }
1262
1263 #[test_traced]
1264 fn test_fixed_journal_replay_with_missing_historical_blob() {
1265 let executor = deterministic::Runner::default();
1266 executor.start(|context| async move {
1267 let cfg = test_cfg(NZU64!(2));
1268 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1269 .await
1270 .expect("failed to initialize journal");
1271 for i in 0u64..5 {
1272 journal
1273 .append(test_digest(i))
1274 .await
1275 .expect("failed to append data");
1276 }
1277 journal.sync().await.expect("failed to sync journal");
1278 drop(journal);
1279
1280 context
1281 .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes()))
1282 .await
1283 .expect("failed to remove blob");
1284
1285 let result = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1287 .await
1288 .expect("init shouldn't fail");
1289
1290 match result.replay(NZUsize!(1024), 0).await {
1292 Err(Error::Corruption(_)) => {}
1293 Err(err) => panic!("expected Corruption, got: {err}"),
1294 Ok(_) => panic!("expected Corruption, got ok"),
1295 };
1296
1297 match result.read(2).await {
1299 Err(Error::Corruption(_)) => {}
1300 Err(err) => panic!("expected Corruption, got: {err}"),
1301 Ok(_) => panic!("expected Corruption, got ok"),
1302 };
1303 });
1304 }
1305
1306 #[test_traced]
1307 fn test_fixed_journal_test_trim_blob() {
1308 let executor = deterministic::Runner::default();
1310 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1312 executor.start(|context| async move {
1313 let cfg = test_cfg(ITEMS_PER_BLOB);
1315 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1316 .await
1317 .expect("failed to initialize journal");
1318
1319 let item_count = ITEMS_PER_BLOB.get() + 3;
1321 for i in 0u64..item_count {
1322 journal
1323 .append(test_digest(i))
1324 .await
1325 .expect("failed to append data");
1326 }
1327 assert_eq!(journal.size(), item_count);
1328 journal.sync().await.expect("Failed to sync journal");
1329 drop(journal);
1330
1331 let (blob, size) = context
1334 .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1335 .await
1336 .expect("Failed to open blob");
1337 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1338 blob.sync().await.expect("Failed to sync blob");
1339
1340 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1341 .await
1342 .unwrap();
1343
1344 assert_eq!(journal.size(), item_count - 1);
1347
1348 journal.destroy().await.expect("Failed to destroy journal");
1350 });
1351 }
1352
1353 #[test_traced]
1354 fn test_fixed_journal_partial_replay() {
1355 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1356 const START_POS: u64 = 53;
1359
1360 let executor = deterministic::Runner::default();
1362 executor.start(|context| async move {
1364 let cfg = test_cfg(ITEMS_PER_BLOB);
1366 let mut journal = Journal::init(context.clone(), cfg.clone())
1367 .await
1368 .expect("failed to initialize journal");
1369
1370 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1372 let pos = journal
1373 .append(test_digest(i))
1374 .await
1375 .expect("failed to append data");
1376 assert_eq!(pos, i);
1377 }
1378
1379 {
1381 let stream = journal
1382 .replay(NZUsize!(1024), START_POS)
1383 .await
1384 .expect("failed to replay journal");
1385 let mut items = Vec::new();
1386 pin_mut!(stream);
1387 while let Some(result) = stream.next().await {
1388 match result {
1389 Ok((pos, item)) => {
1390 assert!(pos >= START_POS, "pos={pos}, expected >= {START_POS}");
1391 assert_eq!(
1392 test_digest(pos),
1393 item,
1394 "Item at position {pos} did not match expected digest"
1395 );
1396 items.push(pos);
1397 }
1398 Err(err) => panic!("Failed to read item: {err}"),
1399 }
1400 }
1401
1402 assert_eq!(
1404 items.len(),
1405 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1406 - START_POS as usize
1407 );
1408 items.sort();
1409 for (i, pos) in items.iter().enumerate() {
1410 assert_eq!(i as u64, *pos - START_POS);
1411 }
1412 }
1413
1414 journal.destroy().await.unwrap();
1415 });
1416 }
1417
1418 #[test_traced]
1419 fn test_fixed_journal_recover_from_partial_write() {
1420 let executor = deterministic::Runner::default();
1422
1423 executor.start(|context| async move {
1425 let cfg = test_cfg(NZU64!(3));
1427 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1428 .await
1429 .expect("failed to initialize journal");
1430 for i in 0..5 {
1431 journal
1432 .append(test_digest(i))
1433 .await
1434 .expect("failed to append data");
1435 }
1436 assert_eq!(journal.size(), 5);
1437 journal.sync().await.expect("Failed to sync journal");
1438 drop(journal);
1439
1440 let (blob, size) = context
1442 .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1443 .await
1444 .expect("Failed to open blob");
1445 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1447 blob.sync().await.expect("Failed to sync blob");
1448
1449 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1451 .await
1452 .expect("Failed to re-initialize journal");
1453 assert_eq!(journal.pruning_boundary, 0);
1455 assert_eq!(journal.size(), 4);
1456 drop(journal);
1457
1458 context
1460 .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes()))
1461 .await
1462 .expect("Failed to remove blob");
1463
1464 let journal = Journal::<_, Digest>::init(context.with_label("third"), cfg.clone())
1465 .await
1466 .expect("Failed to re-initialize journal");
1467 assert_eq!(journal.size(), 3);
1469
1470 journal.destroy().await.unwrap();
1471 });
1472 }
1473
1474 #[test_traced]
1475 fn test_fixed_journal_recover_detects_oldest_section_too_short() {
1476 let executor = deterministic::Runner::default();
1477 executor.start(|context| async move {
1478 let cfg = test_cfg(NZU64!(5));
1479 let mut journal =
1480 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
1481 .await
1482 .expect("failed to initialize journal at size");
1483
1484 for i in 0..8u64 {
1486 journal
1487 .append(test_digest(100 + i))
1488 .await
1489 .expect("failed to append data");
1490 }
1491 journal.sync().await.expect("failed to sync journal");
1492 assert_eq!(journal.pruning_boundary, 7);
1493 assert_eq!(journal.size(), 15);
1494 drop(journal);
1495
1496 let (blob, size) = context
1498 .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1499 .await
1500 .expect("failed to open oldest blob");
1501 blob.resize(size - 1).await.expect("failed to corrupt blob");
1502 blob.sync().await.expect("failed to sync blob");
1503
1504 let result =
1505 Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
1506 assert!(matches!(result, Err(Error::Corruption(_))));
1507 });
1508 }
1509
1510 #[test_traced]
1511 fn test_fixed_journal_recover_to_empty_from_partial_write() {
1512 let executor = deterministic::Runner::default();
1513 executor.start(|context| async move {
1514 let cfg = test_cfg(NZU64!(10));
1516 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1517 .await
1518 .expect("failed to initialize journal");
1519 journal
1521 .append(test_digest(0))
1522 .await
1523 .expect("failed to append data");
1524 assert_eq!(journal.size(), 1);
1525 journal.sync().await.expect("Failed to sync journal");
1526 drop(journal);
1527
1528 let (blob, size) = context
1530 .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1531 .await
1532 .expect("Failed to open blob");
1533 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1535 blob.sync().await.expect("Failed to sync blob");
1536
1537 let mut journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1539 .await
1540 .expect("Failed to re-initialize journal");
1541
1542 assert_eq!(journal.bounds().end, 0);
1545 assert!(journal.bounds().is_empty());
1546 journal
1548 .append(test_digest(0))
1549 .await
1550 .expect("failed to append data");
1551 assert_eq!(journal.size(), 1);
1552
1553 journal.destroy().await.unwrap();
1554 });
1555 }
1556
1557 #[test_traced("DEBUG")]
1558 fn test_fixed_journal_recover_from_unwritten_data() {
1559 let executor = deterministic::Runner::default();
1560 executor.start(|context| async move {
1561 let cfg = test_cfg(NZU64!(10));
1563 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1564 .await
1565 .expect("failed to initialize journal");
1566
1567 journal
1569 .append(test_digest(0))
1570 .await
1571 .expect("failed to append data");
1572 assert_eq!(journal.size(), 1);
1573 journal.sync().await.expect("Failed to sync journal");
1574 drop(journal);
1575
1576 let (blob, size) = context
1579 .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1580 .await
1581 .expect("Failed to open blob");
1582 blob.write_at(size, vec![0u8; PAGE_SIZE.get() as usize * 3])
1583 .await
1584 .expect("Failed to extend blob");
1585 blob.sync().await.expect("Failed to sync blob");
1586
1587 let mut journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1589 .await
1590 .expect("Failed to re-initialize journal");
1591
1592 assert_eq!(journal.size(), 1);
1595
1596 journal
1598 .append(test_digest(1))
1599 .await
1600 .expect("failed to append data");
1601
1602 journal.destroy().await.unwrap();
1603 });
1604 }
1605
1606 #[test_traced]
1607 fn test_fixed_journal_rewinding() {
1608 let executor = deterministic::Runner::default();
1609 executor.start(|context| async move {
1610 let cfg = test_cfg(NZU64!(2));
1612 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1613 .await
1614 .expect("failed to initialize journal");
1615 assert!(matches!(journal.rewind(0).await, Ok(())));
1616 assert!(matches!(
1617 journal.rewind(1).await,
1618 Err(Error::InvalidRewind(1))
1619 ));
1620
1621 journal
1623 .append(test_digest(0))
1624 .await
1625 .expect("failed to append data 0");
1626 assert_eq!(journal.size(), 1);
1627 assert!(matches!(journal.rewind(1).await, Ok(()))); assert!(matches!(journal.rewind(0).await, Ok(())));
1629 assert_eq!(journal.size(), 0);
1630
1631 for i in 0..7 {
1633 let pos = journal
1634 .append(test_digest(i))
1635 .await
1636 .expect("failed to append data");
1637 assert_eq!(pos, i);
1638 }
1639 assert_eq!(journal.size(), 7);
1640
1641 assert!(matches!(journal.rewind(4).await, Ok(())));
1643 assert_eq!(journal.size(), 4);
1644
1645 assert!(matches!(journal.rewind(0).await, Ok(())));
1647 assert_eq!(journal.size(), 0);
1648
1649 for _ in 0..10 {
1651 for i in 0..100 {
1652 journal
1653 .append(test_digest(i))
1654 .await
1655 .expect("failed to append data");
1656 }
1657 journal.rewind(journal.size() - 49).await.unwrap();
1658 }
1659 const ITEMS_REMAINING: u64 = 10 * (100 - 49);
1660 assert_eq!(journal.size(), ITEMS_REMAINING);
1661
1662 journal.sync().await.expect("Failed to sync journal");
1663 drop(journal);
1664
1665 let mut cfg = test_cfg(NZU64!(3));
1667 cfg.partition = "test_partition_2".into();
1668 let mut journal = Journal::init(context.with_label("second"), cfg.clone())
1669 .await
1670 .expect("failed to initialize journal");
1671 for _ in 0..10 {
1672 for i in 0..100 {
1673 journal
1674 .append(test_digest(i))
1675 .await
1676 .expect("failed to append data");
1677 }
1678 journal.rewind(journal.size() - 49).await.unwrap();
1679 }
1680 assert_eq!(journal.size(), ITEMS_REMAINING);
1681
1682 journal.sync().await.expect("Failed to sync journal");
1683 drop(journal);
1684
1685 let mut journal: Journal<_, Digest> =
1687 Journal::init(context.with_label("third"), cfg.clone())
1688 .await
1689 .expect("failed to re-initialize journal");
1690 assert_eq!(journal.size(), 10 * (100 - 49));
1691
1692 journal.prune(300).await.expect("pruning failed");
1694 assert_eq!(journal.size(), ITEMS_REMAINING);
1695 assert!(matches!(
1697 journal.rewind(299).await,
1698 Err(Error::InvalidRewind(299))
1699 ));
1700 assert!(matches!(journal.rewind(300).await, Ok(())));
1703 assert_eq!(journal.bounds().end, 300);
1704 assert!(journal.bounds().is_empty());
1705
1706 journal.destroy().await.unwrap();
1707 });
1708 }
1709
1710 #[test_traced]
1718 fn test_fixed_journal_recover_from_page_boundary_truncation() {
1719 let executor = deterministic::Runner::default();
1720 executor.start(|context: Context| async move {
1721 let cfg = test_cfg(NZU64!(100));
1723 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1724 .await
1725 .expect("failed to initialize journal");
1726
1727 for i in 0u64..10 {
1735 journal
1736 .append(test_digest(i))
1737 .await
1738 .expect("failed to append data");
1739 }
1740 assert_eq!(journal.size(), 10);
1741 journal.sync().await.expect("Failed to sync journal");
1742 drop(journal);
1743
1744 let physical_page_size = PAGE_SIZE.get() as u64 + 12;
1747 let (blob, size) = context
1748 .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1749 .await
1750 .expect("Failed to open blob");
1751
1752 let full_pages = size / physical_page_size;
1754 assert!(full_pages >= 2, "need at least 2 pages for this test");
1755 let truncate_to = (full_pages - 1) * physical_page_size;
1756
1757 blob.resize(truncate_to)
1758 .await
1759 .expect("Failed to truncate blob");
1760 blob.sync().await.expect("Failed to sync blob");
1761
1762 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1764 .await
1765 .expect("Failed to re-initialize journal after page truncation");
1766
1767 let remaining_logical_bytes = (full_pages - 1) * PAGE_SIZE.get() as u64;
1772 let expected_items = remaining_logical_bytes / 32; assert_eq!(
1774 journal.size(),
1775 expected_items,
1776 "Journal should recover to {} items after truncation",
1777 expected_items
1778 );
1779
1780 for i in 0..expected_items {
1782 let item = journal
1783 .read(i)
1784 .await
1785 .expect("failed to read recovered item");
1786 assert_eq!(item, test_digest(i), "item {} mismatch after recovery", i);
1787 }
1788
1789 journal.destroy().await.expect("Failed to destroy journal");
1790 });
1791 }
1792
1793 #[test_traced]
1799 fn test_single_item_per_blob() {
1800 let executor = deterministic::Runner::default();
1801 executor.start(|context| async move {
1802 let cfg = Config {
1803 partition: "single_item_per_blob".into(),
1804 items_per_blob: NZU64!(1),
1805 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1806 write_buffer: NZUsize!(2048),
1807 };
1808
1809 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1811 .await
1812 .expect("failed to initialize journal");
1813
1814 assert_eq!(journal.bounds().end, 0);
1816 assert!(journal.bounds().is_empty());
1817
1818 let pos = journal
1820 .append(test_digest(0))
1821 .await
1822 .expect("failed to append");
1823 assert_eq!(pos, 0);
1824 assert_eq!(journal.size(), 1);
1825
1826 journal.sync().await.expect("failed to sync");
1828
1829 let value = journal
1831 .read(journal.size() - 1)
1832 .await
1833 .expect("failed to read");
1834 assert_eq!(value, test_digest(0));
1835
1836 for i in 1..10u64 {
1838 let pos = journal
1839 .append(test_digest(i))
1840 .await
1841 .expect("failed to append");
1842 assert_eq!(pos, i);
1843 assert_eq!(journal.size(), i + 1);
1844
1845 let value = journal
1847 .read(journal.size() - 1)
1848 .await
1849 .expect("failed to read");
1850 assert_eq!(value, test_digest(i));
1851 }
1852
1853 for i in 0..10u64 {
1855 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
1856 }
1857
1858 journal.sync().await.expect("failed to sync");
1859
1860 journal.prune(5).await.expect("failed to prune");
1863
1864 assert_eq!(journal.size(), 10);
1866
1867 assert_eq!(journal.bounds().start, 5);
1869
1870 let value = journal
1872 .read(journal.size() - 1)
1873 .await
1874 .expect("failed to read");
1875 assert_eq!(value, test_digest(9));
1876
1877 for i in 0..5 {
1879 assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1880 }
1881
1882 for i in 5..10u64 {
1884 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
1885 }
1886
1887 for i in 10..15u64 {
1889 let pos = journal
1890 .append(test_digest(i))
1891 .await
1892 .expect("failed to append");
1893 assert_eq!(pos, i);
1894
1895 let value = journal
1897 .read(journal.size() - 1)
1898 .await
1899 .expect("failed to read");
1900 assert_eq!(value, test_digest(i));
1901 }
1902
1903 journal.sync().await.expect("failed to sync");
1904 drop(journal);
1905
1906 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1908 .await
1909 .expect("failed to re-initialize journal");
1910
1911 assert_eq!(journal.size(), 15);
1913
1914 assert_eq!(journal.bounds().start, 5);
1916
1917 let value = journal
1919 .read(journal.size() - 1)
1920 .await
1921 .expect("failed to read");
1922 assert_eq!(value, test_digest(14));
1923
1924 for i in 5..15u64 {
1926 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
1927 }
1928
1929 journal.destroy().await.expect("failed to destroy journal");
1930
1931 let mut journal = Journal::init(context.with_label("third"), cfg.clone())
1934 .await
1935 .expect("failed to initialize journal");
1936
1937 for i in 0..10u64 {
1939 journal.append(test_digest(i + 100)).await.unwrap();
1940 }
1941
1942 journal.prune(5).await.unwrap();
1944 assert_eq!(journal.bounds().end, 10);
1945 assert_eq!(journal.bounds().start, 5);
1946
1947 journal.sync().await.unwrap();
1949 drop(journal);
1950
1951 let journal = Journal::<_, Digest>::init(context.with_label("fourth"), cfg.clone())
1953 .await
1954 .expect("failed to re-initialize journal");
1955
1956 assert_eq!(journal.bounds().end, 10);
1958 assert_eq!(journal.bounds().start, 5);
1959
1960 let value = journal.read(journal.size() - 1).await.unwrap();
1962 assert_eq!(value, test_digest(109));
1963
1964 for i in 5..10u64 {
1966 assert_eq!(journal.read(i).await.unwrap(), test_digest(i + 100));
1967 }
1968
1969 journal.destroy().await.expect("failed to destroy journal");
1970
1971 let mut journal = Journal::init(context.clone(), cfg.clone())
1973 .await
1974 .expect("failed to initialize journal");
1975
1976 for i in 0..5u64 {
1977 journal.append(test_digest(i + 200)).await.unwrap();
1978 }
1979 journal.sync().await.unwrap();
1980
1981 journal.prune(5).await.unwrap();
1983 assert_eq!(journal.bounds().end, 5); assert!(journal.bounds().is_empty()); let result = journal.read(journal.size() - 1).await;
1988 assert!(matches!(result, Err(Error::ItemPruned(4))));
1989
1990 journal.append(test_digest(205)).await.unwrap();
1992 assert_eq!(journal.bounds().start, 5);
1993 assert_eq!(
1994 journal.read(journal.size() - 1).await.unwrap(),
1995 test_digest(205)
1996 );
1997
1998 journal.destroy().await.expect("failed to destroy journal");
1999 });
2000 }
2001
2002 #[test_traced]
2003 fn test_fixed_journal_init_at_size_zero() {
2004 let executor = deterministic::Runner::default();
2005 executor.start(|context| async move {
2006 let cfg = test_cfg(NZU64!(5));
2007 let mut journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 0)
2008 .await
2009 .unwrap();
2010
2011 assert_eq!(journal.bounds().end, 0);
2012 assert!(journal.bounds().is_empty());
2013
2014 let pos = journal.append(test_digest(100)).await.unwrap();
2016 assert_eq!(pos, 0);
2017 assert_eq!(journal.size(), 1);
2018 assert_eq!(journal.read(0).await.unwrap(), test_digest(100));
2019
2020 journal.destroy().await.unwrap();
2021 });
2022 }
2023
2024 #[test_traced]
2025 fn test_fixed_journal_init_at_size_section_boundary() {
2026 let executor = deterministic::Runner::default();
2027 executor.start(|context| async move {
2028 let cfg = test_cfg(NZU64!(5));
2029
2030 let mut journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 10)
2032 .await
2033 .unwrap();
2034
2035 assert_eq!(journal.bounds().end, 10);
2036 assert!(journal.bounds().is_empty());
2037
2038 let pos = journal.append(test_digest(1000)).await.unwrap();
2040 assert_eq!(pos, 10);
2041 assert_eq!(journal.size(), 11);
2042 assert_eq!(journal.read(10).await.unwrap(), test_digest(1000));
2043
2044 let pos = journal.append(test_digest(1001)).await.unwrap();
2046 assert_eq!(pos, 11);
2047 assert_eq!(journal.read(11).await.unwrap(), test_digest(1001));
2048
2049 journal.destroy().await.unwrap();
2050 });
2051 }
2052
2053 #[test_traced]
2054 fn test_fixed_journal_init_at_size_mid_section() {
2055 let executor = deterministic::Runner::default();
2056 executor.start(|context| async move {
2057 let cfg = test_cfg(NZU64!(5));
2058
2059 let mut journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 7)
2062 .await
2063 .unwrap();
2064
2065 assert_eq!(journal.bounds().end, 7);
2066 assert!(journal.bounds().is_empty());
2068
2069 assert!(matches!(journal.read(5).await, Err(Error::ItemPruned(5))));
2071 assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2072
2073 let pos = journal.append(test_digest(700)).await.unwrap();
2075 assert_eq!(pos, 7);
2076 assert_eq!(journal.size(), 8);
2077 assert_eq!(journal.read(7).await.unwrap(), test_digest(700));
2078 assert_eq!(journal.bounds().start, 7);
2080
2081 journal.destroy().await.unwrap();
2082 });
2083 }
2084
2085 #[test_traced]
2086 fn test_fixed_journal_init_at_size_persistence() {
2087 let executor = deterministic::Runner::default();
2088 executor.start(|context| async move {
2089 let cfg = test_cfg(NZU64!(5));
2090
2091 let mut journal =
2093 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2094 .await
2095 .unwrap();
2096
2097 for i in 0..5u64 {
2099 let pos = journal.append(test_digest(1500 + i)).await.unwrap();
2100 assert_eq!(pos, 15 + i);
2101 }
2102
2103 assert_eq!(journal.size(), 20);
2104
2105 journal.sync().await.unwrap();
2107 drop(journal);
2108
2109 let mut journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2110 .await
2111 .unwrap();
2112
2113 assert_eq!(journal.bounds().end, 20);
2115 assert_eq!(journal.bounds().start, 15);
2116
2117 for i in 0..5u64 {
2119 assert_eq!(journal.read(15 + i).await.unwrap(), test_digest(1500 + i));
2120 }
2121
2122 let pos = journal.append(test_digest(9999)).await.unwrap();
2124 assert_eq!(pos, 20);
2125 assert_eq!(journal.read(20).await.unwrap(), test_digest(9999));
2126
2127 journal.destroy().await.unwrap();
2128 });
2129 }
2130
2131 #[test_traced]
2132 fn test_fixed_journal_init_at_size_persistence_without_data() {
2133 let executor = deterministic::Runner::default();
2134 executor.start(|context| async move {
2135 let cfg = test_cfg(NZU64!(5));
2136
2137 let journal =
2139 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2140 .await
2141 .unwrap();
2142
2143 assert_eq!(journal.bounds().end, 15);
2144 assert!(journal.bounds().is_empty());
2145
2146 drop(journal);
2148
2149 let mut journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2151 .await
2152 .unwrap();
2153
2154 assert_eq!(journal.bounds().end, 15);
2155 assert!(journal.bounds().is_empty());
2156
2157 let pos = journal.append(test_digest(1500)).await.unwrap();
2159 assert_eq!(pos, 15);
2160 assert_eq!(journal.read(15).await.unwrap(), test_digest(1500));
2161
2162 journal.destroy().await.unwrap();
2163 });
2164 }
2165
2166 #[test_traced]
2167 fn test_fixed_journal_init_at_size_large_offset() {
2168 let executor = deterministic::Runner::default();
2169 executor.start(|context| async move {
2170 let cfg = test_cfg(NZU64!(5));
2171
2172 let mut journal =
2174 Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 1000)
2175 .await
2176 .unwrap();
2177
2178 assert_eq!(journal.bounds().end, 1000);
2179 assert!(journal.bounds().is_empty());
2180
2181 let pos = journal.append(test_digest(100000)).await.unwrap();
2183 assert_eq!(pos, 1000);
2184 assert_eq!(journal.read(1000).await.unwrap(), test_digest(100000));
2185
2186 journal.destroy().await.unwrap();
2187 });
2188 }
2189
2190 #[test_traced]
2191 fn test_fixed_journal_init_at_size_prune_and_append() {
2192 let executor = deterministic::Runner::default();
2193 executor.start(|context| async move {
2194 let cfg = test_cfg(NZU64!(5));
2195
2196 let mut journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 20)
2198 .await
2199 .unwrap();
2200
2201 for i in 0..10u64 {
2203 journal.append(test_digest(2000 + i)).await.unwrap();
2204 }
2205
2206 assert_eq!(journal.size(), 30);
2207
2208 journal.prune(25).await.unwrap();
2210
2211 assert_eq!(journal.bounds().end, 30);
2212 assert_eq!(journal.bounds().start, 25);
2213
2214 for i in 25..30u64 {
2216 assert_eq!(journal.read(i).await.unwrap(), test_digest(2000 + (i - 20)));
2217 }
2218
2219 let pos = journal.append(test_digest(3000)).await.unwrap();
2221 assert_eq!(pos, 30);
2222
2223 journal.destroy().await.unwrap();
2224 });
2225 }
2226
2227 #[test_traced]
2228 fn test_fixed_journal_clear_to_size() {
2229 let executor = deterministic::Runner::default();
2230 executor.start(|context| async move {
2231 let cfg = test_cfg(NZU64!(10));
2232 let mut journal = Journal::init(context.with_label("journal"), cfg.clone())
2233 .await
2234 .expect("failed to initialize journal");
2235
2236 for i in 0..25u64 {
2238 journal.append(test_digest(i)).await.unwrap();
2239 }
2240 assert_eq!(journal.size(), 25);
2241 journal.sync().await.unwrap();
2242
2243 journal.clear_to_size(100).await.unwrap();
2245 assert_eq!(journal.size(), 100);
2246
2247 for i in 0..25 {
2249 assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
2250 }
2251
2252 drop(journal);
2254 let mut journal =
2255 Journal::<_, Digest>::init(context.with_label("journal_after_clear"), cfg.clone())
2256 .await
2257 .expect("failed to re-initialize journal after clear");
2258 assert_eq!(journal.size(), 100);
2259
2260 for i in 100..105u64 {
2262 let pos = journal.append(test_digest(i)).await.unwrap();
2263 assert_eq!(pos, i);
2264 }
2265 assert_eq!(journal.size(), 105);
2266
2267 for i in 100..105u64 {
2269 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2270 }
2271
2272 journal.sync().await.unwrap();
2274 drop(journal);
2275
2276 let journal = Journal::<_, Digest>::init(context.with_label("journal_reopened"), cfg)
2277 .await
2278 .expect("failed to re-initialize journal");
2279
2280 assert_eq!(journal.size(), 105);
2281 for i in 100..105u64 {
2282 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2283 }
2284
2285 journal.destroy().await.unwrap();
2286 });
2287 }
2288
2289 #[test_traced]
2290 fn test_fixed_journal_sync_crash_meta_none_boundary_aligned() {
2291 let executor = deterministic::Runner::default();
2293 executor.start(|context| async move {
2294 let cfg = test_cfg(NZU64!(5));
2295 let mut journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
2296 .await
2297 .unwrap();
2298
2299 for i in 0..5u64 {
2300 journal.append(test_digest(i)).await.unwrap();
2301 }
2302 let tail_section = journal.size / journal.items_per_blob;
2303 journal.inner.sync(tail_section).await.unwrap();
2304 drop(journal);
2305
2306 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2307 .await
2308 .unwrap();
2309 assert_eq!(journal.bounds().start, 0);
2310 assert_eq!(journal.bounds().end, 5);
2311 journal.destroy().await.unwrap();
2312 });
2313 }
2314
2315 #[test_traced]
2316 fn test_fixed_journal_oldest_section_invalid_len() {
2317 let executor = deterministic::Runner::default();
2319 executor.start(|context| async move {
2320 let cfg = test_cfg(NZU64!(5));
2321 let mut journal =
2322 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2323 .await
2324 .unwrap();
2325 for i in 0..3u64 {
2326 journal.append(test_digest(i)).await.unwrap();
2327 }
2328 assert_eq!(journal.inner.newest_section(), Some(2));
2329 journal.sync().await.unwrap();
2330
2331 journal.metadata.clear();
2333 journal.metadata.sync().await.unwrap();
2334 drop(journal);
2335
2336 let result =
2340 Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
2341 assert!(matches!(result, Err(Error::Corruption(_))));
2342 context.remove(&blob_partition(&cfg), None).await.unwrap();
2343 context
2344 .remove(&format!("{}-metadata", cfg.partition), None)
2345 .await
2346 .unwrap();
2347 });
2348 }
2349
2350 #[test_traced]
2351 fn test_fixed_journal_sync_crash_meta_mid_boundary_unchanged() {
2352 let executor = deterministic::Runner::default();
2354 executor.start(|context| async move {
2355 let cfg = test_cfg(NZU64!(5));
2356 let mut journal =
2357 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2358 .await
2359 .unwrap();
2360 for i in 0..3u64 {
2361 journal.append(test_digest(i)).await.unwrap();
2362 }
2363 let tail_section = journal.size / journal.items_per_blob;
2364 journal.inner.sync(tail_section).await.unwrap();
2365 drop(journal);
2366
2367 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2368 .await
2369 .unwrap();
2370 assert_eq!(journal.bounds().start, 7);
2371 assert_eq!(journal.bounds().end, 10);
2372 journal.destroy().await.unwrap();
2373 });
2374 }
2375 #[test_traced]
2376 fn test_fixed_journal_sync_crash_meta_mid_to_aligned_becomes_stale() {
2377 let executor = deterministic::Runner::default();
2379 executor.start(|context| async move {
2380 let cfg = test_cfg(NZU64!(5));
2381 let mut journal =
2382 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2383 .await
2384 .unwrap();
2385 for i in 0..10u64 {
2386 journal.append(test_digest(i)).await.unwrap();
2387 }
2388 assert_eq!(journal.size(), 17);
2389 journal.prune(10).await.unwrap();
2390
2391 let tail_section = journal.size / journal.items_per_blob;
2392 journal.inner.sync(tail_section).await.unwrap();
2393 drop(journal);
2394
2395 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2396 .await
2397 .unwrap();
2398 assert_eq!(journal.bounds().start, 10);
2399 assert_eq!(journal.bounds().end, 17);
2400 journal.destroy().await.unwrap();
2401 });
2402 }
2403
2404 #[test_traced]
2405 fn test_fixed_journal_prune_does_not_move_boundary_backwards() {
2406 let executor = deterministic::Runner::default();
2409 executor.start(|context| async move {
2410 let cfg = test_cfg(NZU64!(5));
2411 let mut journal =
2413 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2414 .await
2415 .unwrap();
2416 for i in 0..5u64 {
2418 journal.append(test_digest(i)).await.unwrap();
2419 }
2420 journal.prune(5).await.unwrap();
2422 assert_eq!(journal.bounds().start, 7);
2423 journal.destroy().await.unwrap();
2424 });
2425 }
2426
2427 #[test_traced]
2428 fn test_fixed_journal_replay_after_init_at_size_spanning_sections() {
2429 let executor = deterministic::Runner::default();
2432 executor.start(|context| async move {
2433 let cfg = test_cfg(NZU64!(5));
2434
2435 let mut journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 7)
2438 .await
2439 .unwrap();
2440
2441 for i in 0..13u64 {
2443 let pos = journal.append(test_digest(100 + i)).await.unwrap();
2444 assert_eq!(pos, 7 + i);
2445 }
2446 assert_eq!(journal.size(), 20);
2447 journal.sync().await.unwrap();
2448
2449 {
2451 let stream = journal
2452 .replay(NZUsize!(1024), 7)
2453 .await
2454 .expect("failed to replay");
2455 pin_mut!(stream);
2456 let mut items: Vec<(u64, Digest)> = Vec::new();
2457 while let Some(result) = stream.next().await {
2458 items.push(result.expect("replay item failed"));
2459 }
2460
2461 assert_eq!(items.len(), 13);
2463 for (i, (pos, item)) in items.iter().enumerate() {
2464 assert_eq!(*pos, 7 + i as u64);
2465 assert_eq!(*item, test_digest(100 + i as u64));
2466 }
2467 }
2468
2469 {
2471 let stream = journal
2472 .replay(NZUsize!(1024), 12)
2473 .await
2474 .expect("failed to replay from mid-stream");
2475 pin_mut!(stream);
2476 let mut items: Vec<(u64, Digest)> = Vec::new();
2477 while let Some(result) = stream.next().await {
2478 items.push(result.expect("replay item failed"));
2479 }
2480
2481 assert_eq!(items.len(), 8);
2483 for (i, (pos, item)) in items.iter().enumerate() {
2484 assert_eq!(*pos, 12 + i as u64);
2485 assert_eq!(*item, test_digest(100 + 5 + i as u64));
2486 }
2487 }
2488
2489 journal.destroy().await.unwrap();
2490 });
2491 }
2492
2493 #[test_traced]
2494 fn test_fixed_journal_rewind_error_before_bounds_start() {
2495 let executor = deterministic::Runner::default();
2497 executor.start(|context| async move {
2498 let cfg = test_cfg(NZU64!(5));
2499
2500 let mut journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 10)
2501 .await
2502 .unwrap();
2503
2504 for i in 0..3u64 {
2506 journal.append(test_digest(i)).await.unwrap();
2507 }
2508 assert_eq!(journal.size(), 13);
2509
2510 journal.rewind(11).await.unwrap();
2512 assert_eq!(journal.size(), 11);
2513
2514 journal.rewind(10).await.unwrap();
2516 assert_eq!(journal.size(), 10);
2517
2518 let result = journal.rewind(9).await;
2520 assert!(matches!(result, Err(Error::InvalidRewind(9))));
2521
2522 journal.destroy().await.unwrap();
2523 });
2524 }
2525
2526 #[test_traced]
2527 fn test_fixed_journal_init_at_size_crash_scenarios() {
2528 let executor = deterministic::Runner::default();
2529 executor.start(|context| async move {
2530 let cfg = test_cfg(NZU64!(5));
2531
2532 let mut journal =
2534 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2535 .await
2536 .unwrap();
2537 for i in 0..5u64 {
2538 journal.append(test_digest(i)).await.unwrap();
2539 }
2540 journal.sync().await.unwrap();
2541 drop(journal);
2542
2543 let blob_part = blob_partition(&cfg);
2546 context.remove(&blob_part, None).await.unwrap();
2547
2548 let journal = Journal::<_, Digest>::init(context.with_label("crash1"), cfg.clone())
2550 .await
2551 .expect("init failed after clear crash");
2552 assert_eq!(journal.bounds().end, 0);
2553 assert_eq!(journal.bounds().start, 0);
2554 drop(journal);
2555
2556 let meta_cfg = MetadataConfig {
2558 partition: format!("{}-metadata", cfg.partition),
2559 codec_config: ((0..).into(), ()),
2560 };
2561 let mut metadata = Metadata::<_, u64, Vec<u8>>::init(
2562 context.with_label("restore_meta"),
2563 meta_cfg.clone(),
2564 )
2565 .await
2566 .unwrap();
2567 metadata.put(PRUNING_BOUNDARY_KEY, 7u64.to_be_bytes().to_vec());
2568 metadata.sync().await.unwrap();
2569
2570 let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap();
2583 blob.sync().await.unwrap(); let journal = Journal::<_, Digest>::init(context.with_label("crash2"), cfg.clone())
2587 .await
2588 .expect("init failed after create crash");
2589
2590 assert_eq!(journal.bounds().start, 0);
2592 assert_eq!(journal.bounds().end, 0);
2594 journal.destroy().await.unwrap();
2595 });
2596 }
2597
2598 #[test_traced]
2599 fn test_fixed_journal_clear_to_size_crash_scenarios() {
2600 let executor = deterministic::Runner::default();
2601 executor.start(|context| async move {
2602 let cfg = test_cfg(NZU64!(5));
2603
2604 let mut journal =
2607 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 12)
2608 .await
2609 .unwrap();
2610 journal.sync().await.unwrap();
2611 drop(journal);
2612
2613 let blob_part = blob_partition(&cfg);
2618 context.remove(&blob_part, None).await.unwrap();
2619
2620 let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap();
2622 blob.sync().await.unwrap();
2623
2624 let journal =
2629 Journal::<_, Digest>::init(context.with_label("crash_clear"), cfg.clone())
2630 .await
2631 .expect("init failed after clear_to_size crash");
2632
2633 assert_eq!(journal.bounds().start, 0);
2635 assert_eq!(journal.bounds().end, 0);
2636 journal.destroy().await.unwrap();
2637 });
2638 }
2639}