1#[cfg(test)]
57use super::Reader as _;
58use crate::{
59 journal::{
60 contiguous::{Many, Mutable},
61 segmented::fixed::{Config as SegmentedConfig, Journal as SegmentedJournal},
62 Error,
63 },
64 metadata::{Config as MetadataConfig, Metadata},
65 Context, Persistable,
66};
67use commonware_codec::CodecFixedShared;
68use commonware_runtime::buffer::paged::CacheRef;
69use commonware_utils::sync::{AsyncRwLockReadGuard, UpgradableAsyncRwLock};
70use futures::{stream::Stream, StreamExt};
71use std::num::{NonZeroU64, NonZeroUsize};
72use tracing::warn;
73
74const PRUNING_BOUNDARY_KEY: u64 = 1;
76
77#[derive(Clone)]
79pub struct Config {
80 pub partition: String,
85
86 pub items_per_blob: NonZeroU64,
91
92 pub page_cache: CacheRef,
94
95 pub write_buffer: NonZeroUsize,
97}
98
99struct Inner<E: Context, A: CodecFixedShared> {
101 journal: SegmentedJournal<E, A>,
103
104 size: u64,
106
107 metadata: Metadata<E, u64, Vec<u8>>,
115
116 pruning_boundary: u64,
118}
119
120impl<E: Context, A: CodecFixedShared> Inner<E, A> {
121 async fn read(&self, pos: u64, items_per_blob: u64) -> Result<A, Error> {
128 if pos >= self.size {
129 return Err(Error::ItemOutOfRange(pos));
130 }
131 if pos < self.pruning_boundary {
132 return Err(Error::ItemPruned(pos));
133 }
134
135 let section = pos / items_per_blob;
136 let section_start = section * items_per_blob;
137
138 let first_in_section = self.pruning_boundary.max(section_start);
141 let pos_in_section = pos - first_in_section;
142
143 self.journal
144 .get(section, pos_in_section)
145 .await
146 .map_err(|e| {
147 match e {
149 Error::SectionOutOfRange(e)
150 | Error::AlreadyPrunedToSection(e)
151 | Error::ItemOutOfRange(e) => {
152 Error::Corruption(format!("section/item should be found, but got: {e}"))
153 }
154 other => other,
155 }
156 })
157 }
158
159 fn try_read_sync(&self, pos: u64, items_per_blob: u64) -> Option<A> {
161 if pos >= self.size || pos < self.pruning_boundary {
162 return None;
163 }
164 let section = pos / items_per_blob;
165 let section_start = section * items_per_blob;
166 let first_in_section = self.pruning_boundary.max(section_start);
167 let pos_in_section = pos - first_in_section;
168 self.journal.try_get_sync(section, pos_in_section)
169 }
170}
171
172pub struct Journal<E: Context, A: CodecFixedShared> {
187 inner: UpgradableAsyncRwLock<Inner<E, A>>,
192
193 items_per_blob: u64,
195}
196
197pub struct Reader<'a, E: Context, A: CodecFixedShared> {
199 guard: AsyncRwLockReadGuard<'a, Inner<E, A>>,
200 items_per_blob: u64,
201}
202
203impl<E: Context, A: CodecFixedShared> super::Reader for Reader<'_, E, A> {
204 type Item = A;
205
206 fn bounds(&self) -> std::ops::Range<u64> {
207 self.guard.pruning_boundary..self.guard.size
208 }
209
210 async fn read(&self, pos: u64) -> Result<A, Error> {
211 self.guard.read(pos, self.items_per_blob).await
212 }
213
214 fn try_read_sync(&self, pos: u64) -> Option<A> {
215 self.guard.try_read_sync(pos, self.items_per_blob)
216 }
217
218 async fn replay(
219 &self,
220 buffer: NonZeroUsize,
221 start_pos: u64,
222 ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + Send, Error> {
223 let items_per_blob = self.items_per_blob;
224 let pruning_boundary = self.guard.pruning_boundary;
225
226 if start_pos > self.guard.size {
228 return Err(Error::ItemOutOfRange(start_pos));
229 }
230 if start_pos < pruning_boundary {
231 return Err(Error::ItemPruned(start_pos));
232 }
233
234 let start_section = start_pos / items_per_blob;
235 let section_start = start_section * items_per_blob;
236
237 let first_in_section = pruning_boundary.max(section_start);
239 let start_pos_in_section = start_pos - first_in_section;
240
241 let journal = &self.guard.journal;
243 if let (Some(oldest), Some(newest)) = (journal.oldest_section(), journal.newest_section()) {
244 let first_to_check = start_section.max(oldest + 1);
245 for section in first_to_check..newest {
246 let len = journal.section_len(section).await?;
247 if len < items_per_blob {
248 return Err(Error::Corruption(format!(
249 "section {section} incomplete: expected {items_per_blob} items, got {len}"
250 )));
251 }
252 }
253 }
254
255 let inner_stream = journal
256 .replay(start_section, start_pos_in_section, buffer)
257 .await?;
258
259 let stream = inner_stream.map(move |result| {
261 result.map(|(section, pos_in_section, item)| {
262 let section_start = section * items_per_blob;
263 let first_in_section = pruning_boundary.max(section_start);
264 let global_pos = first_in_section + pos_in_section;
265 (global_pos, item)
266 })
267 });
268
269 Ok(stream)
270 }
271}
272
273impl<E: Context, A: CodecFixedShared> Journal<E, A> {
274 pub const CHUNK_SIZE: usize = SegmentedJournal::<E, A>::CHUNK_SIZE;
276
277 pub const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
279
280 async fn scan_partition(context: &E, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
282 match context.scan(partition).await {
283 Ok(blobs) => Ok(blobs),
284 Err(commonware_runtime::Error::PartitionMissing(_)) => Ok(Vec::new()),
285 Err(err) => Err(Error::Runtime(err)),
286 }
287 }
288
289 async fn select_blob_partition(context: &E, cfg: &Config) -> Result<String, Error> {
295 let legacy_partition = cfg.partition.as_str();
296 let new_partition = format!("{}-blobs", cfg.partition);
297
298 let legacy_blobs = Self::scan_partition(context, legacy_partition).await?;
299 let new_blobs = Self::scan_partition(context, &new_partition).await?;
300
301 if !legacy_blobs.is_empty() && !new_blobs.is_empty() {
302 return Err(Error::Corruption(format!(
303 "both legacy and blobs partitions contain data: legacy={} blobs={}",
304 legacy_partition, new_partition
305 )));
306 }
307
308 if !legacy_blobs.is_empty() {
309 Ok(legacy_partition.into())
310 } else {
311 Ok(new_partition)
312 }
313 }
314
315 pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
320 let items_per_blob = cfg.items_per_blob.get();
321
322 let blob_partition = Self::select_blob_partition(&context, &cfg).await?;
323 let segmented_cfg = SegmentedConfig {
324 partition: blob_partition,
325 page_cache: cfg.page_cache,
326 write_buffer: cfg.write_buffer,
327 };
328
329 let mut journal =
330 SegmentedJournal::init(context.with_label("blobs"), segmented_cfg).await?;
331 let meta_cfg = MetadataConfig {
333 partition: format!("{}-metadata", cfg.partition),
334 codec_config: ((0..).into(), ()),
335 };
336
337 let mut metadata =
338 Metadata::<_, u64, Vec<u8>>::init(context.with_label("meta"), meta_cfg).await?;
339
340 let meta_pruning_boundary = match metadata.get(&PRUNING_BOUNDARY_KEY) {
342 Some(bytes) => Some(u64::from_be_bytes(bytes.as_slice().try_into().map_err(
343 |_| Error::Corruption("invalid pruning_boundary metadata".into()),
344 )?)),
345 None => None,
346 };
347
348 let (pruning_boundary, size, needs_metadata_update) =
350 Self::recover_bounds(&journal, items_per_blob, meta_pruning_boundary).await?;
351
352 if needs_metadata_update {
354 if pruning_boundary.is_multiple_of(items_per_blob) {
355 metadata.remove(&PRUNING_BOUNDARY_KEY);
356 } else {
357 metadata.put(
358 PRUNING_BOUNDARY_KEY,
359 pruning_boundary.to_be_bytes().to_vec(),
360 );
361 }
362 metadata.sync().await?;
363 }
364
365 let tail_section = size / items_per_blob;
369 journal.ensure_section_exists(tail_section).await?;
370
371 Ok(Self {
372 inner: UpgradableAsyncRwLock::new(Inner {
373 journal,
374 size,
375 metadata,
376 pruning_boundary,
377 }),
378 items_per_blob,
379 })
380 }
381
382 async fn recover_bounds(
393 inner: &SegmentedJournal<E, A>,
394 items_per_blob: u64,
395 meta_pruning_boundary: Option<u64>,
396 ) -> Result<(u64, u64, bool), Error> {
397 let blob_boundary = inner.oldest_section().map_or(0, |o| o * items_per_blob);
399
400 let (pruning_boundary, needs_update) = match meta_pruning_boundary {
401 Some(meta_pruning_boundary)
403 if !meta_pruning_boundary.is_multiple_of(items_per_blob) =>
404 {
405 let meta_oldest_section = meta_pruning_boundary / items_per_blob;
406 match inner.oldest_section() {
407 None => {
408 warn!(
412 meta_oldest_section,
413 "crash repair: no blobs exist, ignoring stale metadata"
414 );
415 (blob_boundary, true)
416 }
417 Some(oldest_section) if meta_oldest_section < oldest_section => {
418 warn!(
419 meta_oldest_section,
420 oldest_section, "crash repair: metadata stale, computing from blobs"
421 );
422 (blob_boundary, true)
423 }
424 Some(oldest_section) if meta_oldest_section > oldest_section => {
425 warn!(
429 meta_oldest_section,
430 oldest_section,
431 "crash repair: metadata ahead of blobs, computing from blobs"
432 );
433 (blob_boundary, true)
434 }
435 Some(_) => (meta_pruning_boundary, false), }
437 }
438 Some(_) => (blob_boundary, true),
440 None => (blob_boundary, false),
442 };
443
444 Self::validate_oldest_section(inner, items_per_blob, pruning_boundary).await?;
446
447 let size = Self::compute_size(inner, items_per_blob, pruning_boundary).await?;
448 Ok((pruning_boundary, size, needs_update))
449 }
450
451 async fn validate_oldest_section(
456 inner: &SegmentedJournal<E, A>,
457 items_per_blob: u64,
458 pruning_boundary: u64,
459 ) -> Result<(), Error> {
460 let (Some(oldest), Some(newest)) = (inner.oldest_section(), inner.newest_section()) else {
461 return Ok(()); };
463
464 if oldest == newest {
465 return Ok(()); }
467
468 let oldest_len = inner.section_len(oldest).await?;
469 let oldest_start = oldest * items_per_blob;
470
471 let expected = if pruning_boundary > oldest_start {
472 items_per_blob - (pruning_boundary - oldest_start)
474 } else {
475 items_per_blob
477 };
478
479 if oldest_len != expected {
480 return Err(Error::Corruption(format!(
481 "oldest section {oldest} has wrong size: expected {expected} items, got {oldest_len}"
482 )));
483 }
484
485 Ok(())
486 }
487
488 async fn compute_size(
490 inner: &SegmentedJournal<E, A>,
491 items_per_blob: u64,
492 pruning_boundary: u64,
493 ) -> Result<u64, Error> {
494 let oldest = inner.oldest_section();
495 let newest = inner.newest_section();
496
497 let (Some(oldest), Some(newest)) = (oldest, newest) else {
498 return Ok(pruning_boundary);
499 };
500
501 if oldest == newest {
502 let tail_len = inner.section_len(newest).await?;
504 return Ok(pruning_boundary + tail_len);
505 }
506
507 let oldest_len = inner.section_len(oldest).await?;
509 let tail_len = inner.section_len(newest).await?;
510
511 let middle_sections = newest - oldest - 1;
513 let middle_items = middle_sections * items_per_blob;
514
515 Ok(pruning_boundary + oldest_len + middle_items + tail_len)
516 }
517
518 #[commonware_macros::stability(ALPHA)]
543 pub async fn init_at_size(context: E, cfg: Config, size: u64) -> Result<Self, Error> {
544 let items_per_blob = cfg.items_per_blob.get();
545 let tail_section = size / items_per_blob;
546
547 let blob_partition = Self::select_blob_partition(&context, &cfg).await?;
548 let segmented_cfg = SegmentedConfig {
549 partition: blob_partition,
550 page_cache: cfg.page_cache,
551 write_buffer: cfg.write_buffer,
552 };
553
554 let meta_cfg = MetadataConfig {
556 partition: format!("{}-metadata", cfg.partition),
557 codec_config: ((0..).into(), ()),
558 };
559 let mut metadata =
560 Metadata::<_, u64, Vec<u8>>::init(context.with_label("meta"), meta_cfg).await?;
561 let mut journal =
562 SegmentedJournal::init(context.with_label("blobs"), segmented_cfg).await?;
563
564 journal.clear().await?;
570 journal.ensure_section_exists(tail_section).await?;
571
572 if !size.is_multiple_of(items_per_blob) {
574 metadata.put(PRUNING_BOUNDARY_KEY, size.to_be_bytes().to_vec());
575 metadata.sync().await?;
576 } else if metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
577 metadata.remove(&PRUNING_BOUNDARY_KEY);
578 metadata.sync().await?;
579 }
580
581 Ok(Self {
582 inner: UpgradableAsyncRwLock::new(Inner {
583 journal,
584 size,
585 metadata,
586 pruning_boundary: size, }),
588 items_per_blob,
589 })
590 }
591
592 #[inline]
594 const fn position_to_section(&self, position: u64) -> (u64, u64) {
595 let section = position / self.items_per_blob;
596 let pos_in_section = position % self.items_per_blob;
597 (section, pos_in_section)
598 }
599
600 pub async fn sync(&self) -> Result<(), Error> {
605 let inner = self.inner.upgradable_read().await;
608
609 let tail_section = inner.size / self.items_per_blob;
611
612 inner.journal.sync(tail_section).await?;
615
616 let pruning_boundary = inner.pruning_boundary;
618 let pruning_boundary_from_metadata = inner.metadata.get(&PRUNING_BOUNDARY_KEY).cloned();
619 let put = if !pruning_boundary.is_multiple_of(self.items_per_blob) {
620 let needs_update = pruning_boundary_from_metadata
621 .is_none_or(|bytes| bytes.as_slice() != pruning_boundary.to_be_bytes());
622
623 if needs_update {
624 true
625 } else {
626 return Ok(());
627 }
628 } else if pruning_boundary_from_metadata.is_some() {
629 false
630 } else {
631 return Ok(());
632 };
633
634 let mut inner = inner.upgrade().await;
637 if put {
638 inner.metadata.put(
639 PRUNING_BOUNDARY_KEY,
640 pruning_boundary.to_be_bytes().to_vec(),
641 );
642 } else {
643 inner.metadata.remove(&PRUNING_BOUNDARY_KEY);
644 }
645 inner.metadata.sync().await?;
646
647 Ok(())
648 }
649
650 pub async fn reader(&self) -> Reader<'_, E, A> {
652 Reader {
653 guard: self.inner.read().await,
654 items_per_blob: self.items_per_blob,
655 }
656 }
657
658 pub async fn size(&self) -> u64 {
661 self.inner.read().await.size
662 }
663
664 pub async fn append(&self, item: &A) -> Result<u64, Error> {
667 self.append_many(Many::Flat(std::slice::from_ref(item)))
668 .await
669 }
670
671 pub async fn append_many<'a>(&'a self, items: Many<'a, A>) -> Result<u64, Error> {
676 if items.is_empty() {
677 return Err(Error::EmptyAppend);
678 }
679
680 let items_count = match &items {
683 Many::Flat(items) => items.len(),
684 Many::Nested(nested_items) => nested_items.iter().map(|s| s.len()).sum(),
685 };
686 let mut items_buf = Vec::with_capacity(items_count * A::SIZE);
687 match &items {
688 Many::Flat(items) => {
689 for item in *items {
690 item.write(&mut items_buf);
691 }
692 }
693 Many::Nested(nested_items) => {
694 for items in *nested_items {
695 for item in *items {
696 item.write(&mut items_buf);
697 }
698 }
699 }
700 }
701
702 let mut inner = self.inner.write().await;
704 let mut written = 0;
705 while written < items_count {
706 let (section, pos_in_section) = self.position_to_section(inner.size);
707 let remaining_space = (self.items_per_blob - pos_in_section) as usize;
708 let batch_count = remaining_space.min(items_count - written);
709 let start = written * A::SIZE;
710 let end = start + batch_count * A::SIZE;
711
712 inner
713 .journal
714 .append_raw(section, &items_buf[start..end])
715 .await?;
716 inner.size += batch_count as u64;
717 written += batch_count;
718
719 if inner.size.is_multiple_of(self.items_per_blob) {
720 let inner_ref = inner.downgrade_to_upgradable();
724 inner_ref.journal.sync(section).await?;
725 inner = inner_ref.upgrade().await;
726 inner.journal.ensure_section_exists(section + 1).await?;
727 }
728 }
729
730 Ok(inner.size - 1)
731 }
732
733 pub async fn rewind(&self, size: u64) -> Result<(), Error> {
742 let mut inner = self.inner.write().await;
743
744 match size.cmp(&inner.size) {
745 std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
746 std::cmp::Ordering::Equal => return Ok(()),
747 std::cmp::Ordering::Less => {}
748 }
749
750 if size < inner.pruning_boundary {
751 return Err(Error::InvalidRewind(size));
752 }
753
754 let section = size / self.items_per_blob;
755 let section_start = section * self.items_per_blob;
756
757 let first_in_section = inner.pruning_boundary.max(section_start);
759 let pos_in_section = size - first_in_section;
760 let byte_offset = pos_in_section * Self::CHUNK_SIZE_U64;
761
762 inner.journal.rewind(section, byte_offset).await?;
763 inner.size = size;
764
765 Ok(())
766 }
767
768 pub async fn pruning_boundary(&self) -> u64 {
770 let inner = self.inner.read().await;
771 inner.pruning_boundary
772 }
773
774 pub async fn prune(&self, min_item_pos: u64) -> Result<bool, Error> {
781 let mut inner = self.inner.write().await;
782
783 let target_section = min_item_pos / self.items_per_blob;
785
786 let tail_section = inner.size / self.items_per_blob;
788
789 let min_section = std::cmp::min(target_section, tail_section);
791
792 let pruned = inner.journal.prune(min_section).await?;
793
794 if pruned {
796 let new_oldest = inner
797 .journal
798 .oldest_section()
799 .expect("all sections pruned - violates tail section invariant");
800 assert!(inner.pruning_boundary < new_oldest * self.items_per_blob);
802 inner.pruning_boundary = new_oldest * self.items_per_blob;
803 }
804
805 Ok(pruned)
806 }
807
808 pub async fn destroy(self) -> Result<(), Error> {
810 let inner = self.inner.into_inner();
812 inner.journal.destroy().await?;
813
814 inner.metadata.destroy().await?;
816
817 Ok(())
818 }
819
820 pub(crate) async fn clear_to_size(&self, new_size: u64) -> Result<(), Error> {
829 let mut inner = self.inner.write().await;
835 inner.journal.clear().await?;
836 let tail_section = new_size / self.items_per_blob;
837 inner.journal.ensure_section_exists(tail_section).await?;
838
839 inner.size = new_size;
840 inner.pruning_boundary = new_size; if !inner.pruning_boundary.is_multiple_of(self.items_per_blob) {
844 let value = inner.pruning_boundary.to_be_bytes().to_vec();
845 inner.metadata.put(PRUNING_BOUNDARY_KEY, value);
846 inner.metadata.sync().await?;
847 } else if inner.metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
848 inner.metadata.remove(&PRUNING_BOUNDARY_KEY);
849 inner.metadata.sync().await?;
850 }
851
852 Ok(())
853 }
854
855 #[cfg(test)]
857 pub(crate) async fn read(&self, pos: u64) -> Result<A, Error> {
858 self.reader().await.read(pos).await
859 }
860
861 #[cfg(test)]
863 pub(crate) async fn bounds(&self) -> std::ops::Range<u64> {
864 self.reader().await.bounds()
865 }
866
867 #[cfg(test)]
869 pub(crate) async fn test_oldest_section(&self) -> Option<u64> {
870 let inner = self.inner.read().await;
871 inner.journal.oldest_section()
872 }
873
874 #[cfg(test)]
876 pub(crate) async fn test_newest_section(&self) -> Option<u64> {
877 let inner = self.inner.read().await;
878 inner.journal.newest_section()
879 }
880}
881
882impl<E: Context, A: CodecFixedShared> super::Contiguous for Journal<E, A> {
884 type Item = A;
885
886 async fn reader(&self) -> impl super::Reader<Item = A> + '_ {
887 Self::reader(self).await
888 }
889
890 async fn size(&self) -> u64 {
891 Self::size(self).await
892 }
893}
894
895impl<E: Context, A: CodecFixedShared> Mutable for Journal<E, A> {
896 async fn append(&mut self, item: &Self::Item) -> Result<u64, Error> {
897 Self::append(self, item).await
898 }
899
900 async fn append_many<'a>(&'a mut self, items: Many<'a, Self::Item>) -> Result<u64, Error> {
901 Self::append_many(self, items).await
902 }
903
904 async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
905 Self::prune(self, min_position).await
906 }
907
908 async fn rewind(&mut self, size: u64) -> Result<(), Error> {
909 Self::rewind(self, size).await
910 }
911}
912
913impl<E: Context, A: CodecFixedShared> Persistable for Journal<E, A> {
914 type Error = Error;
915
916 async fn commit(&self) -> Result<(), Error> {
917 self.sync().await
918 }
919
920 async fn sync(&self) -> Result<(), Error> {
921 self.sync().await
922 }
923
924 async fn destroy(self) -> Result<(), Error> {
925 self.destroy().await
926 }
927}
928
929#[commonware_macros::stability(ALPHA)]
930impl<E: Context, A: CodecFixedShared> crate::journal::authenticated::Inner<E> for Journal<E, A> {
931 type Config = Config;
932
933 async fn init<F: crate::merkle::Family, H: commonware_cryptography::Hasher>(
934 context: E,
935 merkle_cfg: crate::merkle::journaled::Config,
936 journal_cfg: Self::Config,
937 rewind_predicate: fn(&A) -> bool,
938 ) -> Result<
939 crate::journal::authenticated::Journal<F, E, Self, H>,
940 crate::journal::authenticated::Error<F>,
941 > {
942 crate::journal::authenticated::Journal::<F, E, Self, H>::new(
943 context,
944 merkle_cfg,
945 journal_cfg,
946 rewind_predicate,
947 )
948 .await
949 }
950}
951
952#[cfg(test)]
953mod tests {
954 use super::*;
955 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
956 use commonware_macros::test_traced;
957 use commonware_runtime::{
958 deterministic::{self, Context},
959 Blob, BufferPooler, Error as RuntimeError, Metrics, Runner, Storage,
960 };
961 use commonware_utils::{NZUsize, NZU16, NZU64};
962 use futures::{pin_mut, StreamExt};
963 use std::num::NonZeroU16;
964
965 const PAGE_SIZE: NonZeroU16 = NZU16!(44);
966 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
967
968 fn test_digest(value: u64) -> Digest {
970 Sha256::hash(&value.to_be_bytes())
971 }
972
973 fn test_cfg(pooler: &impl BufferPooler, items_per_blob: NonZeroU64) -> Config {
974 Config {
975 partition: "test-partition".into(),
976 items_per_blob,
977 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
978 write_buffer: NZUsize!(2048),
979 }
980 }
981
982 fn blob_partition(cfg: &Config) -> String {
983 format!("{}-blobs", cfg.partition)
984 }
985
986 async fn scan_partition(context: &Context, partition: &str) -> Vec<Vec<u8>> {
987 match context.scan(partition).await {
988 Ok(blobs) => blobs,
989 Err(RuntimeError::PartitionMissing(_)) => Vec::new(),
990 Err(err) => panic!("Failed to scan partition {partition}: {err}"),
991 }
992 }
993
994 #[test_traced]
995 fn test_fixed_journal_init_conflicting_partitions() {
996 let executor = deterministic::Runner::default();
997 executor.start(|context| async move {
998 let cfg = test_cfg(&context, NZU64!(2));
999 let legacy_partition = cfg.partition.clone();
1000 let blobs_partition = blob_partition(&cfg);
1001
1002 let (legacy_blob, _) = context
1003 .open(&legacy_partition, &0u64.to_be_bytes())
1004 .await
1005 .expect("Failed to open legacy blob");
1006 legacy_blob
1007 .write_at(0, vec![0u8; 1])
1008 .await
1009 .expect("Failed to write legacy blob");
1010 legacy_blob
1011 .sync()
1012 .await
1013 .expect("Failed to sync legacy blob");
1014
1015 let (new_blob, _) = context
1016 .open(&blobs_partition, &0u64.to_be_bytes())
1017 .await
1018 .expect("Failed to open new blob");
1019 new_blob
1020 .write_at(0, vec![0u8; 1])
1021 .await
1022 .expect("Failed to write new blob");
1023 new_blob.sync().await.expect("Failed to sync new blob");
1024
1025 let result =
1026 Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
1027 assert!(matches!(result, Err(Error::Corruption(_))));
1028 });
1029 }
1030
1031 #[test_traced]
1032 fn test_fixed_journal_init_prefers_legacy_partition() {
1033 let executor = deterministic::Runner::default();
1034 executor.start(|context| async move {
1035 let cfg = test_cfg(&context, NZU64!(2));
1036 let legacy_partition = cfg.partition.clone();
1037 let blobs_partition = blob_partition(&cfg);
1038
1039 let (legacy_blob, _) = context
1041 .open(&legacy_partition, &0u64.to_be_bytes())
1042 .await
1043 .expect("Failed to open legacy blob");
1044 legacy_blob
1045 .write_at(0, vec![0u8; 1])
1046 .await
1047 .expect("Failed to write legacy blob");
1048 legacy_blob
1049 .sync()
1050 .await
1051 .expect("Failed to sync legacy blob");
1052
1053 let journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
1054 .await
1055 .expect("failed to initialize journal");
1056 journal.append(&test_digest(1)).await.unwrap();
1057 journal.sync().await.unwrap();
1058 drop(journal);
1059
1060 let legacy_blobs = scan_partition(&context, &legacy_partition).await;
1061 let new_blobs = scan_partition(&context, &blobs_partition).await;
1062 assert!(!legacy_blobs.is_empty());
1063 assert!(new_blobs.is_empty());
1064 });
1065 }
1066
1067 #[test_traced]
1068 fn test_fixed_journal_init_defaults_to_blobs_partition() {
1069 let executor = deterministic::Runner::default();
1070 executor.start(|context| async move {
1071 let cfg = test_cfg(&context, NZU64!(2));
1072 let legacy_partition = cfg.partition.clone();
1073 let blobs_partition = blob_partition(&cfg);
1074
1075 let journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
1076 .await
1077 .expect("failed to initialize journal");
1078 journal.append(&test_digest(1)).await.unwrap();
1079 journal.sync().await.unwrap();
1080 drop(journal);
1081
1082 let legacy_blobs = scan_partition(&context, &legacy_partition).await;
1083 let new_blobs = scan_partition(&context, &blobs_partition).await;
1084 assert!(legacy_blobs.is_empty());
1085 assert!(!new_blobs.is_empty());
1086 });
1087 }
1088
1089 #[test_traced]
1090 fn test_fixed_journal_append_and_prune() {
1091 let executor = deterministic::Runner::default();
1093
1094 executor.start(|context| async move {
1096 let cfg = test_cfg(&context, NZU64!(2));
1098 let journal = Journal::init(context.with_label("first"), cfg.clone())
1099 .await
1100 .expect("failed to initialize journal");
1101
1102 let mut pos = journal
1104 .append(&test_digest(0))
1105 .await
1106 .expect("failed to append data 0");
1107 assert_eq!(pos, 0);
1108
1109 journal.sync().await.expect("Failed to sync journal");
1111 drop(journal);
1112
1113 let cfg = test_cfg(&context, NZU64!(2));
1114 let journal = Journal::init(context.with_label("second"), cfg.clone())
1115 .await
1116 .expect("failed to re-initialize journal");
1117 assert_eq!(journal.size().await, 1);
1118
1119 pos = journal
1121 .append(&test_digest(1))
1122 .await
1123 .expect("failed to append data 1");
1124 assert_eq!(pos, 1);
1125 pos = journal
1126 .append(&test_digest(2))
1127 .await
1128 .expect("failed to append data 2");
1129 assert_eq!(pos, 2);
1130
1131 let item0 = journal.read(0).await.expect("failed to read data 0");
1133 assert_eq!(item0, test_digest(0));
1134 let item1 = journal.read(1).await.expect("failed to read data 1");
1135 assert_eq!(item1, test_digest(1));
1136 let item2 = journal.read(2).await.expect("failed to read data 2");
1137 assert_eq!(item2, test_digest(2));
1138 let err = journal.read(3).await.expect_err("expected read to fail");
1139 assert!(matches!(err, Error::ItemOutOfRange(3)));
1140
1141 journal.sync().await.expect("failed to sync journal");
1143
1144 journal.prune(1).await.expect("failed to prune journal 1");
1146
1147 journal.prune(2).await.expect("failed to prune journal 2");
1149 assert_eq!(journal.bounds().await.start, 2);
1150
1151 let result0 = journal.read(0).await;
1153 assert!(matches!(result0, Err(Error::ItemPruned(0))));
1154 let result1 = journal.read(1).await;
1155 assert!(matches!(result1, Err(Error::ItemPruned(1))));
1156
1157 let result2 = journal.read(2).await.unwrap();
1159 assert_eq!(result2, test_digest(2));
1160
1161 for i in 3..10 {
1163 let pos = journal
1164 .append(&test_digest(i))
1165 .await
1166 .expect("failed to append data");
1167 assert_eq!(pos, i);
1168 }
1169
1170 journal.prune(0).await.expect("no-op pruning failed");
1172 assert_eq!(journal.inner.read().await.journal.oldest_section(), Some(1));
1173 assert_eq!(journal.inner.read().await.journal.newest_section(), Some(5));
1174 assert_eq!(journal.bounds().await.start, 2);
1175
1176 journal
1178 .prune(3 * cfg.items_per_blob.get())
1179 .await
1180 .expect("failed to prune journal 2");
1181 assert_eq!(journal.inner.read().await.journal.oldest_section(), Some(3));
1182 assert_eq!(journal.inner.read().await.journal.newest_section(), Some(5));
1183 assert_eq!(journal.bounds().await.start, 6);
1184
1185 journal
1187 .prune(10000)
1188 .await
1189 .expect("failed to max-prune journal");
1190 let size = journal.size().await;
1191 assert_eq!(size, 10);
1192 assert_eq!(journal.test_oldest_section().await, Some(5));
1193 assert_eq!(journal.test_newest_section().await, Some(5));
1194 let bounds = journal.bounds().await;
1197 assert!(bounds.is_empty());
1198 assert_eq!(bounds.start, size);
1200
1201 {
1203 let reader = journal.reader().await;
1204 let result = reader.replay(NZUsize!(1024), 0).await;
1205 assert!(matches!(result, Err(Error::ItemPruned(0))));
1206 }
1207
1208 {
1210 let reader = journal.reader().await;
1211 let res = reader.replay(NZUsize!(1024), 0).await;
1212 assert!(matches!(res, Err(Error::ItemPruned(_))));
1213
1214 let reader = journal.reader().await;
1215 let stream = reader
1216 .replay(NZUsize!(1024), journal.bounds().await.start)
1217 .await
1218 .expect("failed to replay journal from pruning boundary");
1219 pin_mut!(stream);
1220 let mut items = Vec::new();
1221 while let Some(result) = stream.next().await {
1222 match result {
1223 Ok((pos, item)) => {
1224 assert_eq!(test_digest(pos), item);
1225 items.push(pos);
1226 }
1227 Err(err) => panic!("Failed to read item: {err}"),
1228 }
1229 }
1230 assert_eq!(items, Vec::<u64>::new());
1231 }
1232
1233 journal.destroy().await.unwrap();
1234 });
1235 }
1236
1237 #[test_traced]
1239 fn test_fixed_journal_append_a_lot_of_data() {
1240 let executor = deterministic::Runner::default();
1242 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10000);
1243 executor.start(|context| async move {
1244 let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1245 let journal = Journal::init(context.with_label("first"), cfg.clone())
1246 .await
1247 .expect("failed to initialize journal");
1248 for i in 0u64..ITEMS_PER_BLOB.get() * 2 - 1 {
1250 journal
1251 .append(&test_digest(i))
1252 .await
1253 .expect("failed to append data");
1254 }
1255 journal.sync().await.expect("failed to sync journal");
1257 drop(journal);
1258 let journal = Journal::init(context.with_label("second"), cfg.clone())
1259 .await
1260 .expect("failed to re-initialize journal");
1261 for i in 0u64..10000 {
1262 let item: Digest = journal.read(i).await.expect("failed to read data");
1263 assert_eq!(item, test_digest(i));
1264 }
1265 journal.destroy().await.expect("failed to destroy journal");
1266 });
1267 }
1268
1269 #[test_traced]
1270 fn test_fixed_journal_replay() {
1271 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1272 let executor = deterministic::Runner::default();
1274
1275 executor.start(|context| async move {
1277 let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1279 let journal = Journal::init(context.with_label("first"), cfg.clone())
1280 .await
1281 .expect("failed to initialize journal");
1282
1283 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1285 let pos = journal
1286 .append(&test_digest(i))
1287 .await
1288 .expect("failed to append data");
1289 assert_eq!(pos, i);
1290 }
1291
1292 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1294 let item: Digest = journal.read(i).await.expect("failed to read data");
1295 assert_eq!(item, test_digest(i), "i={i}");
1296 }
1297
1298 {
1300 let reader = journal.reader().await;
1301 let stream = reader
1302 .replay(NZUsize!(1024), 0)
1303 .await
1304 .expect("failed to replay journal");
1305 let mut items = Vec::new();
1306 pin_mut!(stream);
1307 while let Some(result) = stream.next().await {
1308 match result {
1309 Ok((pos, item)) => {
1310 assert_eq!(test_digest(pos), item, "pos={pos}, item={item:?}");
1311 items.push(pos);
1312 }
1313 Err(err) => panic!("Failed to read item: {err}"),
1314 }
1315 }
1316
1317 assert_eq!(
1319 items.len(),
1320 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1321 );
1322 items.sort();
1323 for (i, pos) in items.iter().enumerate() {
1324 assert_eq!(i as u64, *pos);
1325 }
1326 }
1327
1328 journal.sync().await.expect("Failed to sync journal");
1329 drop(journal);
1330
1331 let (blob, _) = context
1333 .open(&blob_partition(&cfg), &40u64.to_be_bytes())
1334 .await
1335 .expect("Failed to open blob");
1336 let bad_bytes = 123456789u32;
1338 blob.write_at(1, bad_bytes.to_be_bytes().to_vec())
1339 .await
1340 .expect("Failed to write bad bytes");
1341 blob.sync().await.expect("Failed to sync blob");
1342
1343 let journal = Journal::init(context.with_label("second"), cfg.clone())
1345 .await
1346 .expect("Failed to re-initialize journal");
1347
1348 let err = journal
1350 .read(40 * ITEMS_PER_BLOB.get() + 1)
1351 .await
1352 .unwrap_err();
1353 assert!(matches!(err, Error::Runtime(_)));
1354
1355 {
1357 let mut error_found = false;
1358 let reader = journal.reader().await;
1359 let stream = reader
1360 .replay(NZUsize!(1024), 0)
1361 .await
1362 .expect("failed to replay journal");
1363 let mut items = Vec::new();
1364 pin_mut!(stream);
1365 while let Some(result) = stream.next().await {
1366 match result {
1367 Ok((pos, item)) => {
1368 assert_eq!(test_digest(pos), item);
1369 items.push(pos);
1370 }
1371 Err(err) => {
1372 error_found = true;
1373 assert!(matches!(err, Error::Runtime(_)));
1374 break;
1375 }
1376 }
1377 }
1378 assert!(error_found); }
1380 });
1381 }
1382
1383 #[test_traced]
1384 fn test_fixed_journal_init_with_corrupted_historical_blobs() {
1385 let executor = deterministic::Runner::default();
1387 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1389 executor.start(|context| async move {
1390 let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1392 let journal = Journal::init(context.with_label("first"), cfg.clone())
1393 .await
1394 .expect("failed to initialize journal");
1395
1396 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1398 let pos = journal
1399 .append(&test_digest(i))
1400 .await
1401 .expect("failed to append data");
1402 assert_eq!(pos, i);
1403 }
1404 journal.sync().await.expect("Failed to sync journal");
1405 drop(journal);
1406
1407 let (blob, size) = context
1412 .open(&blob_partition(&cfg), &40u64.to_be_bytes())
1413 .await
1414 .expect("Failed to open blob");
1415 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1416 blob.sync().await.expect("Failed to sync blob");
1417
1418 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1421 .await
1422 .expect("failed to initialize journal");
1423
1424 let expected_size = ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2;
1427 assert_eq!(journal.size().await, expected_size);
1428
1429 let reader = journal.reader().await;
1431 match reader.replay(NZUsize!(1024), 0).await {
1432 Err(Error::Corruption(msg)) => {
1433 assert!(
1434 msg.contains("section 40"),
1435 "Error should mention section 40, got: {msg}"
1436 );
1437 }
1438 Err(e) => panic!("Expected Corruption error for section 40, got: {:?}", e),
1439 Ok(_) => panic!("Expected replay to fail with corruption"),
1440 };
1441 });
1442 }
1443
1444 #[test_traced]
1445 fn test_fixed_journal_replay_with_missing_historical_blob() {
1446 let executor = deterministic::Runner::default();
1447 executor.start(|context| async move {
1448 let cfg = test_cfg(&context, NZU64!(2));
1449 let journal = Journal::init(context.with_label("first"), cfg.clone())
1450 .await
1451 .expect("failed to initialize journal");
1452 for i in 0u64..5 {
1453 journal
1454 .append(&test_digest(i))
1455 .await
1456 .expect("failed to append data");
1457 }
1458 journal.sync().await.expect("failed to sync journal");
1459 drop(journal);
1460
1461 context
1462 .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes()))
1463 .await
1464 .expect("failed to remove blob");
1465
1466 let result = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1468 .await
1469 .expect("init shouldn't fail");
1470
1471 let reader = result.reader().await;
1473 match reader.replay(NZUsize!(1024), 0).await {
1474 Err(Error::Corruption(_)) => {}
1475 Err(err) => panic!("expected Corruption, got: {err}"),
1476 Ok(_) => panic!("expected Corruption, got ok"),
1477 };
1478
1479 match result.read(2).await {
1481 Err(Error::Corruption(_)) => {}
1482 Err(err) => panic!("expected Corruption, got: {err}"),
1483 Ok(_) => panic!("expected Corruption, got ok"),
1484 };
1485 });
1486 }
1487
1488 #[test_traced]
1489 fn test_fixed_journal_test_trim_blob() {
1490 let executor = deterministic::Runner::default();
1492 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1494 executor.start(|context| async move {
1495 let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1497 let journal = Journal::init(context.with_label("first"), cfg.clone())
1498 .await
1499 .expect("failed to initialize journal");
1500
1501 let item_count = ITEMS_PER_BLOB.get() + 3;
1503 for i in 0u64..item_count {
1504 journal
1505 .append(&test_digest(i))
1506 .await
1507 .expect("failed to append data");
1508 }
1509 assert_eq!(journal.size().await, item_count);
1510 journal.sync().await.expect("Failed to sync journal");
1511 drop(journal);
1512
1513 let (blob, size) = context
1516 .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1517 .await
1518 .expect("Failed to open blob");
1519 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1520 blob.sync().await.expect("Failed to sync blob");
1521
1522 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1523 .await
1524 .unwrap();
1525
1526 assert_eq!(journal.size().await, item_count - 1);
1529
1530 journal.destroy().await.expect("Failed to destroy journal");
1532 });
1533 }
1534
1535 #[test_traced]
1536 fn test_fixed_journal_partial_replay() {
1537 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1538 const START_POS: u64 = 53;
1541
1542 let executor = deterministic::Runner::default();
1544 executor.start(|context| async move {
1546 let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1548 let journal = Journal::init(context.clone(), cfg.clone())
1549 .await
1550 .expect("failed to initialize journal");
1551
1552 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1554 let pos = journal
1555 .append(&test_digest(i))
1556 .await
1557 .expect("failed to append data");
1558 assert_eq!(pos, i);
1559 }
1560
1561 {
1563 let reader = journal.reader().await;
1564 let stream = reader
1565 .replay(NZUsize!(1024), START_POS)
1566 .await
1567 .expect("failed to replay journal");
1568 let mut items = Vec::new();
1569 pin_mut!(stream);
1570 while let Some(result) = stream.next().await {
1571 match result {
1572 Ok((pos, item)) => {
1573 assert!(pos >= START_POS, "pos={pos}, expected >= {START_POS}");
1574 assert_eq!(
1575 test_digest(pos),
1576 item,
1577 "Item at position {pos} did not match expected digest"
1578 );
1579 items.push(pos);
1580 }
1581 Err(err) => panic!("Failed to read item: {err}"),
1582 }
1583 }
1584
1585 assert_eq!(
1587 items.len(),
1588 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1589 - START_POS as usize
1590 );
1591 items.sort();
1592 for (i, pos) in items.iter().enumerate() {
1593 assert_eq!(i as u64, *pos - START_POS);
1594 }
1595 }
1596
1597 journal.destroy().await.unwrap();
1598 });
1599 }
1600
1601 #[test_traced]
1602 fn test_fixed_journal_recover_from_partial_write() {
1603 let executor = deterministic::Runner::default();
1605
1606 executor.start(|context| async move {
1608 let cfg = test_cfg(&context, NZU64!(3));
1610 let journal = Journal::init(context.with_label("first"), cfg.clone())
1611 .await
1612 .expect("failed to initialize journal");
1613 for i in 0..5 {
1614 journal
1615 .append(&test_digest(i))
1616 .await
1617 .expect("failed to append data");
1618 }
1619 assert_eq!(journal.size().await, 5);
1620 journal.sync().await.expect("Failed to sync journal");
1621 drop(journal);
1622
1623 let (blob, size) = context
1625 .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1626 .await
1627 .expect("Failed to open blob");
1628 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1630 blob.sync().await.expect("Failed to sync blob");
1631
1632 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1634 .await
1635 .expect("Failed to re-initialize journal");
1636 assert_eq!(journal.pruning_boundary().await, 0);
1638 assert_eq!(journal.size().await, 4);
1639 drop(journal);
1640
1641 context
1643 .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes()))
1644 .await
1645 .expect("Failed to remove blob");
1646
1647 let journal = Journal::<_, Digest>::init(context.with_label("third"), cfg.clone())
1648 .await
1649 .expect("Failed to re-initialize journal");
1650 assert_eq!(journal.size().await, 3);
1652
1653 journal.destroy().await.unwrap();
1654 });
1655 }
1656
1657 #[test_traced]
1658 fn test_fixed_journal_recover_detects_oldest_section_too_short() {
1659 let executor = deterministic::Runner::default();
1660 executor.start(|context| async move {
1661 let cfg = test_cfg(&context, NZU64!(5));
1662 let journal =
1663 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
1664 .await
1665 .expect("failed to initialize journal at size");
1666
1667 for i in 0..8u64 {
1669 journal
1670 .append(&test_digest(100 + i))
1671 .await
1672 .expect("failed to append data");
1673 }
1674 journal.sync().await.expect("failed to sync journal");
1675 assert_eq!(journal.pruning_boundary().await, 7);
1676 assert_eq!(journal.size().await, 15);
1677 drop(journal);
1678
1679 let (blob, size) = context
1681 .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1682 .await
1683 .expect("failed to open oldest blob");
1684 blob.resize(size - 1).await.expect("failed to corrupt blob");
1685 blob.sync().await.expect("failed to sync blob");
1686
1687 let result =
1688 Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
1689 assert!(matches!(result, Err(Error::Corruption(_))));
1690 });
1691 }
1692
1693 #[test_traced]
1694 fn test_fixed_journal_recover_to_empty_from_partial_write() {
1695 let executor = deterministic::Runner::default();
1696 executor.start(|context| async move {
1697 let cfg = test_cfg(&context, NZU64!(10));
1699 let journal = Journal::init(context.with_label("first"), cfg.clone())
1700 .await
1701 .expect("failed to initialize journal");
1702 journal
1704 .append(&test_digest(0))
1705 .await
1706 .expect("failed to append data");
1707 assert_eq!(journal.size().await, 1);
1708 journal.sync().await.expect("Failed to sync journal");
1709 drop(journal);
1710
1711 let (blob, size) = context
1713 .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1714 .await
1715 .expect("Failed to open blob");
1716 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1718 blob.sync().await.expect("Failed to sync blob");
1719
1720 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1722 .await
1723 .expect("Failed to re-initialize journal");
1724
1725 let bounds = journal.bounds().await;
1728 assert_eq!(bounds.end, 0);
1729 assert!(bounds.is_empty());
1730 journal
1732 .append(&test_digest(0))
1733 .await
1734 .expect("failed to append data");
1735 assert_eq!(journal.size().await, 1);
1736
1737 journal.destroy().await.unwrap();
1738 });
1739 }
1740
1741 #[test_traced("DEBUG")]
1742 fn test_fixed_journal_recover_from_unwritten_data() {
1743 let executor = deterministic::Runner::default();
1744 executor.start(|context| async move {
1745 let cfg = test_cfg(&context, NZU64!(10));
1747 let journal = Journal::init(context.with_label("first"), cfg.clone())
1748 .await
1749 .expect("failed to initialize journal");
1750
1751 journal
1753 .append(&test_digest(0))
1754 .await
1755 .expect("failed to append data");
1756 assert_eq!(journal.size().await, 1);
1757 journal.sync().await.expect("Failed to sync journal");
1758 drop(journal);
1759
1760 let (blob, size) = context
1763 .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1764 .await
1765 .expect("Failed to open blob");
1766 blob.write_at(size, vec![0u8; PAGE_SIZE.get() as usize * 3])
1767 .await
1768 .expect("Failed to extend blob");
1769 blob.sync().await.expect("Failed to sync blob");
1770
1771 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1773 .await
1774 .expect("Failed to re-initialize journal");
1775
1776 assert_eq!(journal.size().await, 1);
1779
1780 journal
1782 .append(&test_digest(1))
1783 .await
1784 .expect("failed to append data");
1785
1786 journal.destroy().await.unwrap();
1787 });
1788 }
1789
1790 #[test_traced]
1791 fn test_fixed_journal_rewinding() {
1792 let executor = deterministic::Runner::default();
1793 executor.start(|context| async move {
1794 let cfg = test_cfg(&context, NZU64!(2));
1796 let journal = Journal::init(context.with_label("first"), cfg.clone())
1797 .await
1798 .expect("failed to initialize journal");
1799 assert!(matches!(journal.rewind(0).await, Ok(())));
1800 assert!(matches!(
1801 journal.rewind(1).await,
1802 Err(Error::InvalidRewind(1))
1803 ));
1804
1805 journal
1807 .append(&test_digest(0))
1808 .await
1809 .expect("failed to append data 0");
1810 assert_eq!(journal.size().await, 1);
1811 assert!(matches!(journal.rewind(1).await, Ok(()))); assert!(matches!(journal.rewind(0).await, Ok(())));
1813 assert_eq!(journal.size().await, 0);
1814
1815 for i in 0..7 {
1817 let pos = journal
1818 .append(&test_digest(i))
1819 .await
1820 .expect("failed to append data");
1821 assert_eq!(pos, i);
1822 }
1823 assert_eq!(journal.size().await, 7);
1824
1825 assert!(matches!(journal.rewind(4).await, Ok(())));
1827 assert_eq!(journal.size().await, 4);
1828
1829 assert!(matches!(journal.rewind(0).await, Ok(())));
1831 assert_eq!(journal.size().await, 0);
1832
1833 for _ in 0..10 {
1835 for i in 0..100 {
1836 journal
1837 .append(&test_digest(i))
1838 .await
1839 .expect("failed to append data");
1840 }
1841 journal.rewind(journal.size().await - 49).await.unwrap();
1842 }
1843 const ITEMS_REMAINING: u64 = 10 * (100 - 49);
1844 assert_eq!(journal.size().await, ITEMS_REMAINING);
1845
1846 journal.sync().await.expect("Failed to sync journal");
1847 drop(journal);
1848
1849 let mut cfg = test_cfg(&context, NZU64!(3));
1851 cfg.partition = "test-partition-2".into();
1852 let journal = Journal::init(context.with_label("second"), cfg.clone())
1853 .await
1854 .expect("failed to initialize journal");
1855 for _ in 0..10 {
1856 for i in 0..100 {
1857 journal
1858 .append(&test_digest(i))
1859 .await
1860 .expect("failed to append data");
1861 }
1862 journal.rewind(journal.size().await - 49).await.unwrap();
1863 }
1864 assert_eq!(journal.size().await, ITEMS_REMAINING);
1865
1866 journal.sync().await.expect("Failed to sync journal");
1867 drop(journal);
1868
1869 let journal: Journal<_, Digest> =
1871 Journal::init(context.with_label("third"), cfg.clone())
1872 .await
1873 .expect("failed to re-initialize journal");
1874 assert_eq!(journal.size().await, 10 * (100 - 49));
1875
1876 journal.prune(300).await.expect("pruning failed");
1878 assert_eq!(journal.size().await, ITEMS_REMAINING);
1879 assert!(matches!(
1881 journal.rewind(299).await,
1882 Err(Error::InvalidRewind(299))
1883 ));
1884 assert!(matches!(journal.rewind(300).await, Ok(())));
1887 let bounds = journal.bounds().await;
1888 assert_eq!(bounds.end, 300);
1889 assert!(bounds.is_empty());
1890
1891 journal.destroy().await.unwrap();
1892 });
1893 }
1894
1895 #[test_traced]
1903 fn test_fixed_journal_recover_from_page_boundary_truncation() {
1904 let executor = deterministic::Runner::default();
1905 executor.start(|context: Context| async move {
1906 let cfg = test_cfg(&context, NZU64!(100));
1908 let journal = Journal::init(context.with_label("first"), cfg.clone())
1909 .await
1910 .expect("failed to initialize journal");
1911
1912 for i in 0u64..10 {
1920 journal
1921 .append(&test_digest(i))
1922 .await
1923 .expect("failed to append data");
1924 }
1925 assert_eq!(journal.size().await, 10);
1926 journal.sync().await.expect("Failed to sync journal");
1927 drop(journal);
1928
1929 let physical_page_size = PAGE_SIZE.get() as u64 + 12;
1932 let (blob, size) = context
1933 .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1934 .await
1935 .expect("Failed to open blob");
1936
1937 let full_pages = size / physical_page_size;
1939 assert!(full_pages >= 2, "need at least 2 pages for this test");
1940 let truncate_to = (full_pages - 1) * physical_page_size;
1941
1942 blob.resize(truncate_to)
1943 .await
1944 .expect("Failed to truncate blob");
1945 blob.sync().await.expect("Failed to sync blob");
1946
1947 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1949 .await
1950 .expect("Failed to re-initialize journal after page truncation");
1951
1952 let remaining_logical_bytes = (full_pages - 1) * PAGE_SIZE.get() as u64;
1957 let expected_items = remaining_logical_bytes / 32; assert_eq!(
1959 journal.size().await,
1960 expected_items,
1961 "Journal should recover to {} items after truncation",
1962 expected_items
1963 );
1964
1965 for i in 0..expected_items {
1967 let item = journal
1968 .read(i)
1969 .await
1970 .expect("failed to read recovered item");
1971 assert_eq!(item, test_digest(i), "item {} mismatch after recovery", i);
1972 }
1973
1974 journal.destroy().await.expect("Failed to destroy journal");
1975 });
1976 }
1977
1978 #[test_traced]
1984 fn test_single_item_per_blob() {
1985 let executor = deterministic::Runner::default();
1986 executor.start(|context| async move {
1987 let cfg = Config {
1988 partition: "single-item-per-blob".into(),
1989 items_per_blob: NZU64!(1),
1990 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1991 write_buffer: NZUsize!(2048),
1992 };
1993
1994 let journal = Journal::init(context.with_label("first"), cfg.clone())
1996 .await
1997 .expect("failed to initialize journal");
1998
1999 let bounds = journal.bounds().await;
2001 assert_eq!(bounds.end, 0);
2002 assert!(bounds.is_empty());
2003
2004 let pos = journal
2006 .append(&test_digest(0))
2007 .await
2008 .expect("failed to append");
2009 assert_eq!(pos, 0);
2010 assert_eq!(journal.size().await, 1);
2011
2012 journal.sync().await.expect("failed to sync");
2014
2015 let value = journal
2017 .read(journal.size().await - 1)
2018 .await
2019 .expect("failed to read");
2020 assert_eq!(value, test_digest(0));
2021
2022 for i in 1..10u64 {
2024 let pos = journal
2025 .append(&test_digest(i))
2026 .await
2027 .expect("failed to append");
2028 assert_eq!(pos, i);
2029 assert_eq!(journal.size().await, i + 1);
2030
2031 let value = journal
2033 .read(journal.size().await - 1)
2034 .await
2035 .expect("failed to read");
2036 assert_eq!(value, test_digest(i));
2037 }
2038
2039 for i in 0..10u64 {
2041 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2042 }
2043
2044 journal.sync().await.expect("failed to sync");
2045
2046 journal.prune(5).await.expect("failed to prune");
2049
2050 assert_eq!(journal.size().await, 10);
2052
2053 assert_eq!(journal.bounds().await.start, 5);
2055
2056 let value = journal
2058 .read(journal.size().await - 1)
2059 .await
2060 .expect("failed to read");
2061 assert_eq!(value, test_digest(9));
2062
2063 for i in 0..5 {
2065 assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
2066 }
2067
2068 for i in 5..10u64 {
2070 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2071 }
2072
2073 for i in 10..15u64 {
2075 let pos = journal
2076 .append(&test_digest(i))
2077 .await
2078 .expect("failed to append");
2079 assert_eq!(pos, i);
2080
2081 let value = journal
2083 .read(journal.size().await - 1)
2084 .await
2085 .expect("failed to read");
2086 assert_eq!(value, test_digest(i));
2087 }
2088
2089 journal.sync().await.expect("failed to sync");
2090 drop(journal);
2091
2092 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2094 .await
2095 .expect("failed to re-initialize journal");
2096
2097 assert_eq!(journal.size().await, 15);
2099
2100 assert_eq!(journal.bounds().await.start, 5);
2102
2103 let value = journal
2105 .read(journal.size().await - 1)
2106 .await
2107 .expect("failed to read");
2108 assert_eq!(value, test_digest(14));
2109
2110 for i in 5..15u64 {
2112 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2113 }
2114
2115 journal.destroy().await.expect("failed to destroy journal");
2116
2117 let journal = Journal::init(context.with_label("third"), cfg.clone())
2120 .await
2121 .expect("failed to initialize journal");
2122
2123 for i in 0..10u64 {
2125 journal.append(&test_digest(i + 100)).await.unwrap();
2126 }
2127
2128 journal.prune(5).await.unwrap();
2130 let bounds = journal.bounds().await;
2131 assert_eq!(bounds.end, 10);
2132 assert_eq!(bounds.start, 5);
2133
2134 journal.sync().await.unwrap();
2136 drop(journal);
2137
2138 let journal = Journal::<_, Digest>::init(context.with_label("fourth"), cfg.clone())
2140 .await
2141 .expect("failed to re-initialize journal");
2142
2143 let bounds = journal.bounds().await;
2145 assert_eq!(bounds.end, 10);
2146 assert_eq!(bounds.start, 5);
2147
2148 let value = journal.read(journal.size().await - 1).await.unwrap();
2150 assert_eq!(value, test_digest(109));
2151
2152 for i in 5..10u64 {
2154 assert_eq!(journal.read(i).await.unwrap(), test_digest(i + 100));
2155 }
2156
2157 journal.destroy().await.expect("failed to destroy journal");
2158
2159 let journal = Journal::init(context.clone(), cfg.clone())
2161 .await
2162 .expect("failed to initialize journal");
2163
2164 for i in 0..5u64 {
2165 journal.append(&test_digest(i + 200)).await.unwrap();
2166 }
2167 journal.sync().await.unwrap();
2168
2169 journal.prune(5).await.unwrap();
2171 let bounds = journal.bounds().await;
2172 assert_eq!(bounds.end, 5); assert!(bounds.is_empty()); let result = journal.read(journal.size().await - 1).await;
2177 assert!(matches!(result, Err(Error::ItemPruned(4))));
2178
2179 journal.append(&test_digest(205)).await.unwrap();
2181 assert_eq!(journal.bounds().await.start, 5);
2182 assert_eq!(
2183 journal.read(journal.size().await - 1).await.unwrap(),
2184 test_digest(205)
2185 );
2186
2187 journal.destroy().await.expect("failed to destroy journal");
2188 });
2189 }
2190
2191 #[test_traced]
2192 fn test_fixed_journal_init_at_size_zero() {
2193 let executor = deterministic::Runner::default();
2194 executor.start(|context| async move {
2195 let cfg = test_cfg(&context, NZU64!(5));
2196 let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 0)
2197 .await
2198 .unwrap();
2199
2200 let bounds = journal.bounds().await;
2201 assert_eq!(bounds.end, 0);
2202 assert!(bounds.is_empty());
2203
2204 let pos = journal.append(&test_digest(100)).await.unwrap();
2206 assert_eq!(pos, 0);
2207 assert_eq!(journal.size().await, 1);
2208 assert_eq!(journal.read(0).await.unwrap(), test_digest(100));
2209
2210 journal.destroy().await.unwrap();
2211 });
2212 }
2213
2214 #[test_traced]
2215 fn test_fixed_journal_init_at_size_section_boundary() {
2216 let executor = deterministic::Runner::default();
2217 executor.start(|context| async move {
2218 let cfg = test_cfg(&context, NZU64!(5));
2219
2220 let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 10)
2222 .await
2223 .unwrap();
2224
2225 let bounds = journal.bounds().await;
2226 assert_eq!(bounds.end, 10);
2227 assert!(bounds.is_empty());
2228
2229 let pos = journal.append(&test_digest(1000)).await.unwrap();
2231 assert_eq!(pos, 10);
2232 assert_eq!(journal.size().await, 11);
2233 assert_eq!(journal.read(10).await.unwrap(), test_digest(1000));
2234
2235 let pos = journal.append(&test_digest(1001)).await.unwrap();
2237 assert_eq!(pos, 11);
2238 assert_eq!(journal.read(11).await.unwrap(), test_digest(1001));
2239
2240 journal.destroy().await.unwrap();
2241 });
2242 }
2243
2244 #[test_traced]
2245 fn test_fixed_journal_init_at_size_mid_section() {
2246 let executor = deterministic::Runner::default();
2247 executor.start(|context| async move {
2248 let cfg = test_cfg(&context, NZU64!(5));
2249
2250 let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 7)
2252 .await
2253 .unwrap();
2254
2255 let bounds = journal.bounds().await;
2256 assert_eq!(bounds.end, 7);
2257 assert!(bounds.is_empty());
2259
2260 assert!(matches!(journal.read(5).await, Err(Error::ItemPruned(5))));
2262 assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2263
2264 let pos = journal.append(&test_digest(700)).await.unwrap();
2266 assert_eq!(pos, 7);
2267 assert_eq!(journal.size().await, 8);
2268 assert_eq!(journal.read(7).await.unwrap(), test_digest(700));
2269 assert_eq!(journal.bounds().await.start, 7);
2271
2272 journal.destroy().await.unwrap();
2273 });
2274 }
2275
2276 #[test_traced]
2277 fn test_fixed_journal_init_at_size_persistence() {
2278 let executor = deterministic::Runner::default();
2279 executor.start(|context| async move {
2280 let cfg = test_cfg(&context, NZU64!(5));
2281
2282 let journal =
2284 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2285 .await
2286 .unwrap();
2287
2288 for i in 0..5u64 {
2290 let pos = journal.append(&test_digest(1500 + i)).await.unwrap();
2291 assert_eq!(pos, 15 + i);
2292 }
2293
2294 assert_eq!(journal.size().await, 20);
2295
2296 journal.sync().await.unwrap();
2298 drop(journal);
2299
2300 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2301 .await
2302 .unwrap();
2303
2304 let bounds = journal.bounds().await;
2306 assert_eq!(bounds.end, 20);
2307 assert_eq!(bounds.start, 15);
2308
2309 for i in 0..5u64 {
2311 assert_eq!(journal.read(15 + i).await.unwrap(), test_digest(1500 + i));
2312 }
2313
2314 let pos = journal.append(&test_digest(9999)).await.unwrap();
2316 assert_eq!(pos, 20);
2317 assert_eq!(journal.read(20).await.unwrap(), test_digest(9999));
2318
2319 journal.destroy().await.unwrap();
2320 });
2321 }
2322
2323 #[test_traced]
2324 fn test_fixed_journal_init_at_size_persistence_without_data() {
2325 let executor = deterministic::Runner::default();
2326 executor.start(|context| async move {
2327 let cfg = test_cfg(&context, NZU64!(5));
2328
2329 let journal =
2331 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2332 .await
2333 .unwrap();
2334
2335 let bounds = journal.bounds().await;
2336 assert_eq!(bounds.end, 15);
2337 assert!(bounds.is_empty());
2338
2339 drop(journal);
2341
2342 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2344 .await
2345 .unwrap();
2346
2347 let bounds = journal.bounds().await;
2348 assert_eq!(bounds.end, 15);
2349 assert!(bounds.is_empty());
2350
2351 let pos = journal.append(&test_digest(1500)).await.unwrap();
2353 assert_eq!(pos, 15);
2354 assert_eq!(journal.read(15).await.unwrap(), test_digest(1500));
2355
2356 journal.destroy().await.unwrap();
2357 });
2358 }
2359
2360 #[test_traced]
2361 fn test_fixed_journal_init_at_size_large_offset() {
2362 let executor = deterministic::Runner::default();
2363 executor.start(|context| async move {
2364 let cfg = test_cfg(&context, NZU64!(5));
2365
2366 let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 1000)
2368 .await
2369 .unwrap();
2370
2371 let bounds = journal.bounds().await;
2372 assert_eq!(bounds.end, 1000);
2373 assert!(bounds.is_empty());
2374
2375 let pos = journal.append(&test_digest(100000)).await.unwrap();
2377 assert_eq!(pos, 1000);
2378 assert_eq!(journal.read(1000).await.unwrap(), test_digest(100000));
2379
2380 journal.destroy().await.unwrap();
2381 });
2382 }
2383
2384 #[test_traced]
2385 fn test_fixed_journal_init_at_size_prune_and_append() {
2386 let executor = deterministic::Runner::default();
2387 executor.start(|context| async move {
2388 let cfg = test_cfg(&context, NZU64!(5));
2389
2390 let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 20)
2392 .await
2393 .unwrap();
2394
2395 for i in 0..10u64 {
2397 journal.append(&test_digest(2000 + i)).await.unwrap();
2398 }
2399
2400 assert_eq!(journal.size().await, 30);
2401
2402 journal.prune(25).await.unwrap();
2404
2405 let bounds = journal.bounds().await;
2406 assert_eq!(bounds.end, 30);
2407 assert_eq!(bounds.start, 25);
2408
2409 for i in 25..30u64 {
2411 assert_eq!(journal.read(i).await.unwrap(), test_digest(2000 + (i - 20)));
2412 }
2413
2414 let pos = journal.append(&test_digest(3000)).await.unwrap();
2416 assert_eq!(pos, 30);
2417
2418 journal.destroy().await.unwrap();
2419 });
2420 }
2421
2422 #[test_traced]
2423 fn test_fixed_journal_clear_to_size() {
2424 let executor = deterministic::Runner::default();
2425 executor.start(|context| async move {
2426 let cfg = test_cfg(&context, NZU64!(10));
2427 let journal = Journal::init(context.with_label("journal"), cfg.clone())
2428 .await
2429 .expect("failed to initialize journal");
2430
2431 for i in 0..25u64 {
2433 journal.append(&test_digest(i)).await.unwrap();
2434 }
2435 assert_eq!(journal.size().await, 25);
2436 journal.sync().await.unwrap();
2437
2438 journal.clear_to_size(100).await.unwrap();
2440 assert_eq!(journal.size().await, 100);
2441
2442 for i in 0..25 {
2444 assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
2445 }
2446
2447 drop(journal);
2449 let journal =
2450 Journal::<_, Digest>::init(context.with_label("journal_after_clear"), cfg.clone())
2451 .await
2452 .expect("failed to re-initialize journal after clear");
2453 assert_eq!(journal.size().await, 100);
2454
2455 for i in 100..105u64 {
2457 let pos = journal.append(&test_digest(i)).await.unwrap();
2458 assert_eq!(pos, i);
2459 }
2460 assert_eq!(journal.size().await, 105);
2461
2462 for i in 100..105u64 {
2464 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2465 }
2466
2467 journal.sync().await.unwrap();
2469 drop(journal);
2470
2471 let journal = Journal::<_, Digest>::init(context.with_label("journal_reopened"), cfg)
2472 .await
2473 .expect("failed to re-initialize journal");
2474
2475 assert_eq!(journal.size().await, 105);
2476 for i in 100..105u64 {
2477 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2478 }
2479
2480 journal.destroy().await.unwrap();
2481 });
2482 }
2483
2484 #[test_traced]
2485 fn test_fixed_journal_sync_crash_meta_none_boundary_aligned() {
2486 let executor = deterministic::Runner::default();
2488 executor.start(|context| async move {
2489 let cfg = test_cfg(&context, NZU64!(5));
2490 let journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
2491 .await
2492 .unwrap();
2493
2494 for i in 0..5u64 {
2495 journal.append(&test_digest(i)).await.unwrap();
2496 }
2497 let inner = journal.inner.read().await;
2498 let tail_section = inner.size / journal.items_per_blob;
2499 inner.journal.sync(tail_section).await.unwrap();
2500 drop(inner);
2501 drop(journal);
2502
2503 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2504 .await
2505 .unwrap();
2506 let bounds = journal.bounds().await;
2507 assert_eq!(bounds.start, 0);
2508 assert_eq!(bounds.end, 5);
2509 journal.destroy().await.unwrap();
2510 });
2511 }
2512
2513 #[test_traced]
2514 fn test_fixed_journal_oldest_section_invalid_len() {
2515 let executor = deterministic::Runner::default();
2517 executor.start(|context| async move {
2518 let cfg = test_cfg(&context, NZU64!(5));
2519 let journal =
2520 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2521 .await
2522 .unwrap();
2523 for i in 0..3u64 {
2524 journal.append(&test_digest(i)).await.unwrap();
2525 }
2526 assert_eq!(journal.inner.read().await.journal.newest_section(), Some(2));
2527 journal.sync().await.unwrap();
2528
2529 let mut inner = journal.inner.write().await;
2531 inner.metadata.clear();
2532 inner.metadata.sync().await.unwrap();
2533 drop(inner);
2534 drop(journal);
2535
2536 let result =
2540 Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
2541 assert!(matches!(result, Err(Error::Corruption(_))));
2542 context.remove(&blob_partition(&cfg), None).await.unwrap();
2543 context
2544 .remove(&format!("{}-metadata", cfg.partition), None)
2545 .await
2546 .unwrap();
2547 });
2548 }
2549
2550 #[test_traced]
2551 fn test_fixed_journal_sync_crash_meta_mid_boundary_unchanged() {
2552 let executor = deterministic::Runner::default();
2554 executor.start(|context| async move {
2555 let cfg = test_cfg(&context, NZU64!(5));
2556 let journal =
2557 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2558 .await
2559 .unwrap();
2560 for i in 0..3u64 {
2561 journal.append(&test_digest(i)).await.unwrap();
2562 }
2563 let inner = journal.inner.read().await;
2564 let tail_section = inner.size / journal.items_per_blob;
2565 inner.journal.sync(tail_section).await.unwrap();
2566 drop(inner);
2567 drop(journal);
2568
2569 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2570 .await
2571 .unwrap();
2572 let bounds = journal.bounds().await;
2573 assert_eq!(bounds.start, 7);
2574 assert_eq!(bounds.end, 10);
2575 journal.destroy().await.unwrap();
2576 });
2577 }
2578 #[test_traced]
2579 fn test_fixed_journal_sync_crash_meta_mid_to_aligned_becomes_stale() {
2580 let executor = deterministic::Runner::default();
2582 executor.start(|context| async move {
2583 let cfg = test_cfg(&context, NZU64!(5));
2584 let journal =
2585 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2586 .await
2587 .unwrap();
2588 for i in 0..10u64 {
2589 journal.append(&test_digest(i)).await.unwrap();
2590 }
2591 assert_eq!(journal.size().await, 17);
2592 journal.prune(10).await.unwrap();
2593
2594 let inner = journal.inner.read().await;
2595 let tail_section = inner.size / journal.items_per_blob;
2596 inner.journal.sync(tail_section).await.unwrap();
2597 drop(inner);
2598 drop(journal);
2599
2600 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2601 .await
2602 .unwrap();
2603 let bounds = journal.bounds().await;
2604 assert_eq!(bounds.start, 10);
2605 assert_eq!(bounds.end, 17);
2606 journal.destroy().await.unwrap();
2607 });
2608 }
2609
2610 #[test_traced]
2611 fn test_fixed_journal_prune_does_not_move_boundary_backwards() {
2612 let executor = deterministic::Runner::default();
2615 executor.start(|context| async move {
2616 let cfg = test_cfg(&context, NZU64!(5));
2617 let journal =
2619 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2620 .await
2621 .unwrap();
2622 for i in 0..5u64 {
2624 journal.append(&test_digest(i)).await.unwrap();
2625 }
2626 journal.prune(5).await.unwrap();
2628 assert_eq!(journal.bounds().await.start, 7);
2629 journal.destroy().await.unwrap();
2630 });
2631 }
2632
2633 #[test_traced]
2634 fn test_fixed_journal_replay_after_init_at_size_spanning_sections() {
2635 let executor = deterministic::Runner::default();
2638 executor.start(|context| async move {
2639 let cfg = test_cfg(&context, NZU64!(5));
2640
2641 let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 7)
2644 .await
2645 .unwrap();
2646
2647 for i in 0..13u64 {
2649 let pos = journal.append(&test_digest(100 + i)).await.unwrap();
2650 assert_eq!(pos, 7 + i);
2651 }
2652 assert_eq!(journal.size().await, 20);
2653 journal.sync().await.unwrap();
2654
2655 {
2657 let reader = journal.reader().await;
2658 let stream = reader
2659 .replay(NZUsize!(1024), 7)
2660 .await
2661 .expect("failed to replay");
2662 pin_mut!(stream);
2663 let mut items: Vec<(u64, Digest)> = Vec::new();
2664 while let Some(result) = stream.next().await {
2665 items.push(result.expect("replay item failed"));
2666 }
2667
2668 assert_eq!(items.len(), 13);
2670 for (i, (pos, item)) in items.iter().enumerate() {
2671 assert_eq!(*pos, 7 + i as u64);
2672 assert_eq!(*item, test_digest(100 + i as u64));
2673 }
2674 }
2675
2676 {
2678 let reader = journal.reader().await;
2679 let stream = reader
2680 .replay(NZUsize!(1024), 12)
2681 .await
2682 .expect("failed to replay from mid-stream");
2683 pin_mut!(stream);
2684 let mut items: Vec<(u64, Digest)> = Vec::new();
2685 while let Some(result) = stream.next().await {
2686 items.push(result.expect("replay item failed"));
2687 }
2688
2689 assert_eq!(items.len(), 8);
2691 for (i, (pos, item)) in items.iter().enumerate() {
2692 assert_eq!(*pos, 12 + i as u64);
2693 assert_eq!(*item, test_digest(100 + 5 + i as u64));
2694 }
2695 }
2696
2697 journal.destroy().await.unwrap();
2698 });
2699 }
2700
2701 #[test_traced]
2702 fn test_fixed_journal_rewind_error_before_bounds_start() {
2703 let executor = deterministic::Runner::default();
2705 executor.start(|context| async move {
2706 let cfg = test_cfg(&context, NZU64!(5));
2707
2708 let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 10)
2709 .await
2710 .unwrap();
2711
2712 for i in 0..3u64 {
2714 journal.append(&test_digest(i)).await.unwrap();
2715 }
2716 assert_eq!(journal.size().await, 13);
2717
2718 journal.rewind(11).await.unwrap();
2720 assert_eq!(journal.size().await, 11);
2721
2722 journal.rewind(10).await.unwrap();
2724 assert_eq!(journal.size().await, 10);
2725
2726 let result = journal.rewind(9).await;
2728 assert!(matches!(result, Err(Error::InvalidRewind(9))));
2729
2730 journal.destroy().await.unwrap();
2731 });
2732 }
2733
2734 #[test_traced]
2735 fn test_fixed_journal_init_at_size_crash_scenarios() {
2736 let executor = deterministic::Runner::default();
2737 executor.start(|context| async move {
2738 let cfg = test_cfg(&context, NZU64!(5));
2739
2740 let journal =
2742 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2743 .await
2744 .unwrap();
2745 for i in 0..5u64 {
2746 journal.append(&test_digest(i)).await.unwrap();
2747 }
2748 journal.sync().await.unwrap();
2749 drop(journal);
2750
2751 let blob_part = blob_partition(&cfg);
2754 context.remove(&blob_part, None).await.unwrap();
2755
2756 let journal = Journal::<_, Digest>::init(context.with_label("crash1"), cfg.clone())
2758 .await
2759 .expect("init failed after clear crash");
2760 let bounds = journal.bounds().await;
2761 assert_eq!(bounds.end, 0);
2762 assert_eq!(bounds.start, 0);
2763 drop(journal);
2764
2765 let meta_cfg = MetadataConfig {
2767 partition: format!("{}-metadata", cfg.partition),
2768 codec_config: ((0..).into(), ()),
2769 };
2770 let mut metadata = Metadata::<_, u64, Vec<u8>>::init(
2771 context.with_label("restore_meta"),
2772 meta_cfg.clone(),
2773 )
2774 .await
2775 .unwrap();
2776 metadata.put(PRUNING_BOUNDARY_KEY, 7u64.to_be_bytes().to_vec());
2777 metadata.sync().await.unwrap();
2778
2779 let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap();
2792 blob.sync().await.unwrap(); let journal = Journal::<_, Digest>::init(context.with_label("crash2"), cfg.clone())
2796 .await
2797 .expect("init failed after create crash");
2798
2799 let bounds = journal.bounds().await;
2801 assert_eq!(bounds.start, 0);
2802 assert_eq!(bounds.end, 0);
2804 journal.destroy().await.unwrap();
2805 });
2806 }
2807
2808 #[test_traced]
2809 fn test_fixed_journal_clear_to_size_crash_scenarios() {
2810 let executor = deterministic::Runner::default();
2811 executor.start(|context| async move {
2812 let cfg = test_cfg(&context, NZU64!(5));
2813
2814 let journal =
2817 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 12)
2818 .await
2819 .unwrap();
2820 journal.sync().await.unwrap();
2821 drop(journal);
2822
2823 let blob_part = blob_partition(&cfg);
2828 context.remove(&blob_part, None).await.unwrap();
2829
2830 let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap();
2832 blob.sync().await.unwrap();
2833
2834 let journal =
2839 Journal::<_, Digest>::init(context.with_label("crash_clear"), cfg.clone())
2840 .await
2841 .expect("init failed after clear_to_size crash");
2842
2843 let bounds = journal.bounds().await;
2845 assert_eq!(bounds.start, 0);
2846 assert_eq!(bounds.end, 0);
2847 journal.destroy().await.unwrap();
2848 });
2849 }
2850}