1#[cfg(test)]
57use super::Reader as _;
58use crate::{
59 journal::{
60 contiguous::Mutable,
61 segmented::fixed::{Config as SegmentedConfig, Journal as SegmentedJournal},
62 Error,
63 },
64 metadata::{Config as MetadataConfig, Metadata},
65 Persistable,
66};
67use commonware_codec::CodecFixedShared;
68use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
69use commonware_utils::sync::{AsyncRwLockReadGuard, UpgradableAsyncRwLock};
70use futures::{stream::Stream, StreamExt};
71use std::num::{NonZeroU64, NonZeroUsize};
72use tracing::warn;
73
74const PRUNING_BOUNDARY_KEY: u64 = 1;
76
77#[derive(Clone)]
79pub struct Config {
80 pub partition: String,
85
86 pub items_per_blob: NonZeroU64,
91
92 pub page_cache: CacheRef,
94
95 pub write_buffer: NonZeroUsize,
97}
98
99struct Inner<E: Clock + Storage + Metrics, A: CodecFixedShared> {
101 journal: SegmentedJournal<E, A>,
103
104 size: u64,
106
107 metadata: Metadata<E, u64, Vec<u8>>,
115
116 pruning_boundary: u64,
118}
119
120impl<E: Clock + Storage + Metrics, A: CodecFixedShared> Inner<E, A> {
121 async fn read(&self, pos: u64, items_per_blob: u64) -> Result<A, Error> {
128 if pos >= self.size {
129 return Err(Error::ItemOutOfRange(pos));
130 }
131 if pos < self.pruning_boundary {
132 return Err(Error::ItemPruned(pos));
133 }
134
135 let section = pos / items_per_blob;
136 let section_start = section * items_per_blob;
137
138 let first_in_section = self.pruning_boundary.max(section_start);
141 let pos_in_section = pos - first_in_section;
142
143 self.journal
144 .get(section, pos_in_section)
145 .await
146 .map_err(|e| {
147 match e {
149 Error::SectionOutOfRange(e)
150 | Error::AlreadyPrunedToSection(e)
151 | Error::ItemOutOfRange(e) => {
152 Error::Corruption(format!("section/item should be found, but got: {e}"))
153 }
154 other => other,
155 }
156 })
157 }
158}
159
160pub struct Journal<E: Clock + Storage + Metrics, A: CodecFixedShared> {
175 inner: UpgradableAsyncRwLock<Inner<E, A>>,
180
181 items_per_blob: u64,
183}
184
185pub struct Reader<'a, E: Clock + Storage + Metrics, A: CodecFixedShared> {
187 guard: AsyncRwLockReadGuard<'a, Inner<E, A>>,
188 items_per_blob: u64,
189}
190
191impl<E: Clock + Storage + Metrics, A: CodecFixedShared> super::Reader for Reader<'_, E, A> {
192 type Item = A;
193
194 fn bounds(&self) -> std::ops::Range<u64> {
195 self.guard.pruning_boundary..self.guard.size
196 }
197
198 async fn read(&self, pos: u64) -> Result<A, Error> {
199 self.guard.read(pos, self.items_per_blob).await
200 }
201
202 async fn replay(
203 &self,
204 buffer: NonZeroUsize,
205 start_pos: u64,
206 ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + Send, Error> {
207 let items_per_blob = self.items_per_blob;
208 let pruning_boundary = self.guard.pruning_boundary;
209
210 if start_pos > self.guard.size {
212 return Err(Error::ItemOutOfRange(start_pos));
213 }
214 if start_pos < pruning_boundary {
215 return Err(Error::ItemPruned(start_pos));
216 }
217
218 let start_section = start_pos / items_per_blob;
219 let section_start = start_section * items_per_blob;
220
221 let first_in_section = pruning_boundary.max(section_start);
223 let start_pos_in_section = start_pos - first_in_section;
224
225 let journal = &self.guard.journal;
227 if let (Some(oldest), Some(newest)) = (journal.oldest_section(), journal.newest_section()) {
228 let first_to_check = start_section.max(oldest + 1);
229 for section in first_to_check..newest {
230 let len = journal.section_len(section).await?;
231 if len < items_per_blob {
232 return Err(Error::Corruption(format!(
233 "section {section} incomplete: expected {items_per_blob} items, got {len}"
234 )));
235 }
236 }
237 }
238
239 let inner_stream = journal
240 .replay(start_section, start_pos_in_section, buffer)
241 .await?;
242
243 let stream = inner_stream.map(move |result| {
245 result.map(|(section, pos_in_section, item)| {
246 let section_start = section * items_per_blob;
247 let first_in_section = pruning_boundary.max(section_start);
248 let global_pos = first_in_section + pos_in_section;
249 (global_pos, item)
250 })
251 });
252
253 Ok(stream)
254 }
255}
256
257impl<E: Clock + Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
258 pub const CHUNK_SIZE: usize = SegmentedJournal::<E, A>::CHUNK_SIZE;
260
261 pub const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
263
264 async fn scan_partition(context: &E, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
266 match context.scan(partition).await {
267 Ok(blobs) => Ok(blobs),
268 Err(commonware_runtime::Error::PartitionMissing(_)) => Ok(Vec::new()),
269 Err(err) => Err(Error::Runtime(err)),
270 }
271 }
272
273 async fn select_blob_partition(context: &E, cfg: &Config) -> Result<String, Error> {
279 let legacy_partition = cfg.partition.as_str();
280 let new_partition = format!("{}-blobs", cfg.partition);
281
282 let legacy_blobs = Self::scan_partition(context, legacy_partition).await?;
283 let new_blobs = Self::scan_partition(context, &new_partition).await?;
284
285 if !legacy_blobs.is_empty() && !new_blobs.is_empty() {
286 return Err(Error::Corruption(format!(
287 "both legacy and blobs partitions contain data: legacy={} blobs={}",
288 legacy_partition, new_partition
289 )));
290 }
291
292 if !legacy_blobs.is_empty() {
293 Ok(legacy_partition.into())
294 } else {
295 Ok(new_partition)
296 }
297 }
298
299 pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
304 let items_per_blob = cfg.items_per_blob.get();
305
306 let blob_partition = Self::select_blob_partition(&context, &cfg).await?;
307 let segmented_cfg = SegmentedConfig {
308 partition: blob_partition,
309 page_cache: cfg.page_cache,
310 write_buffer: cfg.write_buffer,
311 };
312
313 let mut journal =
314 SegmentedJournal::init(context.with_label("blobs"), segmented_cfg).await?;
315 let meta_cfg = MetadataConfig {
317 partition: format!("{}-metadata", cfg.partition),
318 codec_config: ((0..).into(), ()),
319 };
320
321 let mut metadata =
322 Metadata::<_, u64, Vec<u8>>::init(context.with_label("meta"), meta_cfg).await?;
323
324 let meta_pruning_boundary = match metadata.get(&PRUNING_BOUNDARY_KEY) {
326 Some(bytes) => Some(u64::from_be_bytes(bytes.as_slice().try_into().map_err(
327 |_| Error::Corruption("invalid pruning_boundary metadata".into()),
328 )?)),
329 None => None,
330 };
331
332 let (pruning_boundary, size, needs_metadata_update) =
334 Self::recover_bounds(&journal, items_per_blob, meta_pruning_boundary).await?;
335
336 if needs_metadata_update {
338 if pruning_boundary.is_multiple_of(items_per_blob) {
339 metadata.remove(&PRUNING_BOUNDARY_KEY);
340 } else {
341 metadata.put(
342 PRUNING_BOUNDARY_KEY,
343 pruning_boundary.to_be_bytes().to_vec(),
344 );
345 }
346 metadata.sync().await?;
347 }
348
349 let tail_section = size / items_per_blob;
353 journal.ensure_section_exists(tail_section).await?;
354
355 Ok(Self {
356 inner: UpgradableAsyncRwLock::new(Inner {
357 journal,
358 size,
359 metadata,
360 pruning_boundary,
361 }),
362 items_per_blob,
363 })
364 }
365
366 async fn recover_bounds(
377 inner: &SegmentedJournal<E, A>,
378 items_per_blob: u64,
379 meta_pruning_boundary: Option<u64>,
380 ) -> Result<(u64, u64, bool), Error> {
381 let blob_boundary = inner.oldest_section().map_or(0, |o| o * items_per_blob);
383
384 let (pruning_boundary, needs_update) = match meta_pruning_boundary {
385 Some(meta_pruning_boundary)
387 if !meta_pruning_boundary.is_multiple_of(items_per_blob) =>
388 {
389 let meta_oldest_section = meta_pruning_boundary / items_per_blob;
390 match inner.oldest_section() {
391 None => {
392 warn!(
396 meta_oldest_section,
397 "crash repair: no blobs exist, ignoring stale metadata"
398 );
399 (blob_boundary, true)
400 }
401 Some(oldest_section) if meta_oldest_section < oldest_section => {
402 warn!(
403 meta_oldest_section,
404 oldest_section, "crash repair: metadata stale, computing from blobs"
405 );
406 (blob_boundary, true)
407 }
408 Some(oldest_section) if meta_oldest_section > oldest_section => {
409 warn!(
413 meta_oldest_section,
414 oldest_section,
415 "crash repair: metadata ahead of blobs, computing from blobs"
416 );
417 (blob_boundary, true)
418 }
419 Some(_) => (meta_pruning_boundary, false), }
421 }
422 Some(_) => (blob_boundary, true),
424 None => (blob_boundary, false),
426 };
427
428 Self::validate_oldest_section(inner, items_per_blob, pruning_boundary).await?;
430
431 let size = Self::compute_size(inner, items_per_blob, pruning_boundary).await?;
432 Ok((pruning_boundary, size, needs_update))
433 }
434
435 async fn validate_oldest_section(
440 inner: &SegmentedJournal<E, A>,
441 items_per_blob: u64,
442 pruning_boundary: u64,
443 ) -> Result<(), Error> {
444 let (Some(oldest), Some(newest)) = (inner.oldest_section(), inner.newest_section()) else {
445 return Ok(()); };
447
448 if oldest == newest {
449 return Ok(()); }
451
452 let oldest_len = inner.section_len(oldest).await?;
453 let oldest_start = oldest * items_per_blob;
454
455 let expected = if pruning_boundary > oldest_start {
456 items_per_blob - (pruning_boundary - oldest_start)
458 } else {
459 items_per_blob
461 };
462
463 if oldest_len != expected {
464 return Err(Error::Corruption(format!(
465 "oldest section {oldest} has wrong size: expected {expected} items, got {oldest_len}"
466 )));
467 }
468
469 Ok(())
470 }
471
472 async fn compute_size(
474 inner: &SegmentedJournal<E, A>,
475 items_per_blob: u64,
476 pruning_boundary: u64,
477 ) -> Result<u64, Error> {
478 let oldest = inner.oldest_section();
479 let newest = inner.newest_section();
480
481 let (Some(oldest), Some(newest)) = (oldest, newest) else {
482 return Ok(pruning_boundary);
483 };
484
485 if oldest == newest {
486 let tail_len = inner.section_len(newest).await?;
488 return Ok(pruning_boundary + tail_len);
489 }
490
491 let oldest_len = inner.section_len(oldest).await?;
493 let tail_len = inner.section_len(newest).await?;
494
495 let middle_sections = newest - oldest - 1;
497 let middle_items = middle_sections * items_per_blob;
498
499 Ok(pruning_boundary + oldest_len + middle_items + tail_len)
500 }
501
502 #[commonware_macros::stability(ALPHA)]
527 pub async fn init_at_size(context: E, cfg: Config, size: u64) -> Result<Self, Error> {
528 let items_per_blob = cfg.items_per_blob.get();
529 let tail_section = size / items_per_blob;
530
531 let blob_partition = Self::select_blob_partition(&context, &cfg).await?;
532 let segmented_cfg = SegmentedConfig {
533 partition: blob_partition,
534 page_cache: cfg.page_cache,
535 write_buffer: cfg.write_buffer,
536 };
537
538 let meta_cfg = MetadataConfig {
540 partition: format!("{}-metadata", cfg.partition),
541 codec_config: ((0..).into(), ()),
542 };
543 let mut metadata =
544 Metadata::<_, u64, Vec<u8>>::init(context.with_label("meta"), meta_cfg).await?;
545 let mut journal =
546 SegmentedJournal::init(context.with_label("blobs"), segmented_cfg).await?;
547
548 journal.clear().await?;
554 journal.ensure_section_exists(tail_section).await?;
555
556 if !size.is_multiple_of(items_per_blob) {
558 metadata.put(PRUNING_BOUNDARY_KEY, size.to_be_bytes().to_vec());
559 metadata.sync().await?;
560 } else if metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
561 metadata.remove(&PRUNING_BOUNDARY_KEY);
562 metadata.sync().await?;
563 }
564
565 Ok(Self {
566 inner: UpgradableAsyncRwLock::new(Inner {
567 journal,
568 size,
569 metadata,
570 pruning_boundary: size, }),
572 items_per_blob,
573 })
574 }
575
576 #[inline]
578 const fn position_to_section(&self, position: u64) -> (u64, u64) {
579 let section = position / self.items_per_blob;
580 let pos_in_section = position % self.items_per_blob;
581 (section, pos_in_section)
582 }
583
584 pub async fn sync(&self) -> Result<(), Error> {
589 let inner = self.inner.upgradable_read().await;
592
593 let tail_section = inner.size / self.items_per_blob;
595
596 inner.journal.sync(tail_section).await?;
599
600 let pruning_boundary = inner.pruning_boundary;
602 let pruning_boundary_from_metadata = inner.metadata.get(&PRUNING_BOUNDARY_KEY).cloned();
603 let put = if !pruning_boundary.is_multiple_of(self.items_per_blob) {
604 let needs_update = pruning_boundary_from_metadata
605 .is_none_or(|bytes| bytes.as_slice() != pruning_boundary.to_be_bytes());
606
607 if needs_update {
608 true
609 } else {
610 return Ok(());
611 }
612 } else if pruning_boundary_from_metadata.is_some() {
613 false
614 } else {
615 return Ok(());
616 };
617
618 let mut inner = inner.upgrade().await;
621 if put {
622 inner.metadata.put(
623 PRUNING_BOUNDARY_KEY,
624 pruning_boundary.to_be_bytes().to_vec(),
625 );
626 } else {
627 inner.metadata.remove(&PRUNING_BOUNDARY_KEY);
628 }
629 inner.metadata.sync().await?;
630
631 Ok(())
632 }
633
634 pub async fn reader(&self) -> Reader<'_, E, A> {
636 Reader {
637 guard: self.inner.read().await,
638 items_per_blob: self.items_per_blob,
639 }
640 }
641
642 pub async fn size(&self) -> u64 {
645 self.inner.read().await.size
646 }
647
648 pub async fn append(&self, item: &A) -> Result<u64, Error> {
651 let mut inner = self.inner.write().await;
653
654 let position = inner.size;
656 let (section, _pos_in_section) = self.position_to_section(position);
657 inner.journal.append(section, item).await?;
658 inner.size += 1;
659
660 if !inner.size.is_multiple_of(self.items_per_blob) {
662 return Ok(position);
663 }
664
665 let inner = inner.downgrade_to_upgradable();
669 inner.journal.sync(section).await?;
670
671 let mut inner = inner.upgrade().await;
674 inner.journal.ensure_section_exists(section + 1).await?;
675
676 Ok(position)
677 }
678
679 pub async fn rewind(&self, size: u64) -> Result<(), Error> {
688 let mut inner = self.inner.write().await;
689
690 match size.cmp(&inner.size) {
691 std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
692 std::cmp::Ordering::Equal => return Ok(()),
693 std::cmp::Ordering::Less => {}
694 }
695
696 if size < inner.pruning_boundary {
697 return Err(Error::InvalidRewind(size));
698 }
699
700 let section = size / self.items_per_blob;
701 let section_start = section * self.items_per_blob;
702
703 let first_in_section = inner.pruning_boundary.max(section_start);
705 let pos_in_section = size - first_in_section;
706 let byte_offset = pos_in_section * Self::CHUNK_SIZE_U64;
707
708 inner.journal.rewind(section, byte_offset).await?;
709 inner.size = size;
710
711 Ok(())
712 }
713
714 pub async fn pruning_boundary(&self) -> u64 {
716 let inner = self.inner.read().await;
717 inner.pruning_boundary
718 }
719
720 pub async fn prune(&self, min_item_pos: u64) -> Result<bool, Error> {
727 let mut inner = self.inner.write().await;
728
729 let target_section = min_item_pos / self.items_per_blob;
731
732 let tail_section = inner.size / self.items_per_blob;
734
735 let min_section = std::cmp::min(target_section, tail_section);
737
738 let pruned = inner.journal.prune(min_section).await?;
739
740 if pruned {
742 let new_oldest = inner
743 .journal
744 .oldest_section()
745 .expect("all sections pruned - violates tail section invariant");
746 assert!(inner.pruning_boundary < new_oldest * self.items_per_blob);
748 inner.pruning_boundary = new_oldest * self.items_per_blob;
749 }
750
751 Ok(pruned)
752 }
753
754 pub async fn destroy(self) -> Result<(), Error> {
756 let inner = self.inner.into_inner();
758 inner.journal.destroy().await?;
759
760 inner.metadata.destroy().await?;
762
763 Ok(())
764 }
765
766 pub(crate) async fn clear_to_size(&self, new_size: u64) -> Result<(), Error> {
775 let mut inner = self.inner.write().await;
781 inner.journal.clear().await?;
782 let tail_section = new_size / self.items_per_blob;
783 inner.journal.ensure_section_exists(tail_section).await?;
784
785 inner.size = new_size;
786 inner.pruning_boundary = new_size; if !inner.pruning_boundary.is_multiple_of(self.items_per_blob) {
790 let value = inner.pruning_boundary.to_be_bytes().to_vec();
791 inner.metadata.put(PRUNING_BOUNDARY_KEY, value);
792 inner.metadata.sync().await?;
793 } else if inner.metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
794 inner.metadata.remove(&PRUNING_BOUNDARY_KEY);
795 inner.metadata.sync().await?;
796 }
797
798 Ok(())
799 }
800
801 #[cfg(test)]
803 pub(crate) async fn read(&self, pos: u64) -> Result<A, Error> {
804 self.reader().await.read(pos).await
805 }
806
807 #[cfg(test)]
809 pub(crate) async fn bounds(&self) -> std::ops::Range<u64> {
810 self.reader().await.bounds()
811 }
812
813 #[cfg(test)]
815 pub(crate) async fn test_oldest_section(&self) -> Option<u64> {
816 let inner = self.inner.read().await;
817 inner.journal.oldest_section()
818 }
819
820 #[cfg(test)]
822 pub(crate) async fn test_newest_section(&self) -> Option<u64> {
823 let inner = self.inner.read().await;
824 inner.journal.newest_section()
825 }
826}
827
828impl<E: Clock + Storage + Metrics, A: CodecFixedShared> super::Contiguous for Journal<E, A> {
830 type Item = A;
831
832 async fn reader(&self) -> impl super::Reader<Item = A> + '_ {
833 Self::reader(self).await
834 }
835
836 async fn size(&self) -> u64 {
837 Self::size(self).await
838 }
839}
840
841impl<E: Clock + Storage + Metrics, A: CodecFixedShared> Mutable for Journal<E, A> {
842 async fn append(&mut self, item: &Self::Item) -> Result<u64, Error> {
843 Self::append(self, item).await
844 }
845
846 async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
847 Self::prune(self, min_position).await
848 }
849
850 async fn rewind(&mut self, size: u64) -> Result<(), Error> {
851 Self::rewind(self, size).await
852 }
853}
854
855impl<E: Clock + Storage + Metrics, A: CodecFixedShared> Persistable for Journal<E, A> {
856 type Error = Error;
857
858 async fn commit(&self) -> Result<(), Error> {
859 self.sync().await
860 }
861
862 async fn sync(&self) -> Result<(), Error> {
863 self.sync().await
864 }
865
866 async fn destroy(self) -> Result<(), Error> {
867 self.destroy().await
868 }
869}
870
871#[cfg(test)]
872mod tests {
873 use super::*;
874 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
875 use commonware_macros::test_traced;
876 use commonware_runtime::{
877 deterministic::{self, Context},
878 Blob, BufferPooler, Error as RuntimeError, Metrics, Runner, Storage,
879 };
880 use commonware_utils::{NZUsize, NZU16, NZU64};
881 use futures::{pin_mut, StreamExt};
882 use std::num::NonZeroU16;
883
884 const PAGE_SIZE: NonZeroU16 = NZU16!(44);
885 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
886
887 fn test_digest(value: u64) -> Digest {
889 Sha256::hash(&value.to_be_bytes())
890 }
891
892 fn test_cfg(pooler: &impl BufferPooler, items_per_blob: NonZeroU64) -> Config {
893 Config {
894 partition: "test-partition".into(),
895 items_per_blob,
896 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
897 write_buffer: NZUsize!(2048),
898 }
899 }
900
901 fn blob_partition(cfg: &Config) -> String {
902 format!("{}-blobs", cfg.partition)
903 }
904
905 async fn scan_partition(context: &Context, partition: &str) -> Vec<Vec<u8>> {
906 match context.scan(partition).await {
907 Ok(blobs) => blobs,
908 Err(RuntimeError::PartitionMissing(_)) => Vec::new(),
909 Err(err) => panic!("Failed to scan partition {partition}: {err}"),
910 }
911 }
912
913 #[test_traced]
914 fn test_fixed_journal_init_conflicting_partitions() {
915 let executor = deterministic::Runner::default();
916 executor.start(|context| async move {
917 let cfg = test_cfg(&context, NZU64!(2));
918 let legacy_partition = cfg.partition.clone();
919 let blobs_partition = blob_partition(&cfg);
920
921 let (legacy_blob, _) = context
922 .open(&legacy_partition, &0u64.to_be_bytes())
923 .await
924 .expect("Failed to open legacy blob");
925 legacy_blob
926 .write_at(0, vec![0u8; 1])
927 .await
928 .expect("Failed to write legacy blob");
929 legacy_blob
930 .sync()
931 .await
932 .expect("Failed to sync legacy blob");
933
934 let (new_blob, _) = context
935 .open(&blobs_partition, &0u64.to_be_bytes())
936 .await
937 .expect("Failed to open new blob");
938 new_blob
939 .write_at(0, vec![0u8; 1])
940 .await
941 .expect("Failed to write new blob");
942 new_blob.sync().await.expect("Failed to sync new blob");
943
944 let result =
945 Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
946 assert!(matches!(result, Err(Error::Corruption(_))));
947 });
948 }
949
950 #[test_traced]
951 fn test_fixed_journal_init_prefers_legacy_partition() {
952 let executor = deterministic::Runner::default();
953 executor.start(|context| async move {
954 let cfg = test_cfg(&context, NZU64!(2));
955 let legacy_partition = cfg.partition.clone();
956 let blobs_partition = blob_partition(&cfg);
957
958 let (legacy_blob, _) = context
960 .open(&legacy_partition, &0u64.to_be_bytes())
961 .await
962 .expect("Failed to open legacy blob");
963 legacy_blob
964 .write_at(0, vec![0u8; 1])
965 .await
966 .expect("Failed to write legacy blob");
967 legacy_blob
968 .sync()
969 .await
970 .expect("Failed to sync legacy blob");
971
972 let journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
973 .await
974 .expect("failed to initialize journal");
975 journal.append(&test_digest(1)).await.unwrap();
976 journal.sync().await.unwrap();
977 drop(journal);
978
979 let legacy_blobs = scan_partition(&context, &legacy_partition).await;
980 let new_blobs = scan_partition(&context, &blobs_partition).await;
981 assert!(!legacy_blobs.is_empty());
982 assert!(new_blobs.is_empty());
983 });
984 }
985
986 #[test_traced]
987 fn test_fixed_journal_init_defaults_to_blobs_partition() {
988 let executor = deterministic::Runner::default();
989 executor.start(|context| async move {
990 let cfg = test_cfg(&context, NZU64!(2));
991 let legacy_partition = cfg.partition.clone();
992 let blobs_partition = blob_partition(&cfg);
993
994 let journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
995 .await
996 .expect("failed to initialize journal");
997 journal.append(&test_digest(1)).await.unwrap();
998 journal.sync().await.unwrap();
999 drop(journal);
1000
1001 let legacy_blobs = scan_partition(&context, &legacy_partition).await;
1002 let new_blobs = scan_partition(&context, &blobs_partition).await;
1003 assert!(legacy_blobs.is_empty());
1004 assert!(!new_blobs.is_empty());
1005 });
1006 }
1007
1008 #[test_traced]
1009 fn test_fixed_journal_append_and_prune() {
1010 let executor = deterministic::Runner::default();
1012
1013 executor.start(|context| async move {
1015 let cfg = test_cfg(&context, NZU64!(2));
1017 let journal = Journal::init(context.with_label("first"), cfg.clone())
1018 .await
1019 .expect("failed to initialize journal");
1020
1021 let mut pos = journal
1023 .append(&test_digest(0))
1024 .await
1025 .expect("failed to append data 0");
1026 assert_eq!(pos, 0);
1027
1028 journal.sync().await.expect("Failed to sync journal");
1030 drop(journal);
1031
1032 let cfg = test_cfg(&context, NZU64!(2));
1033 let journal = Journal::init(context.with_label("second"), cfg.clone())
1034 .await
1035 .expect("failed to re-initialize journal");
1036 assert_eq!(journal.size().await, 1);
1037
1038 pos = journal
1040 .append(&test_digest(1))
1041 .await
1042 .expect("failed to append data 1");
1043 assert_eq!(pos, 1);
1044 pos = journal
1045 .append(&test_digest(2))
1046 .await
1047 .expect("failed to append data 2");
1048 assert_eq!(pos, 2);
1049
1050 let item0 = journal.read(0).await.expect("failed to read data 0");
1052 assert_eq!(item0, test_digest(0));
1053 let item1 = journal.read(1).await.expect("failed to read data 1");
1054 assert_eq!(item1, test_digest(1));
1055 let item2 = journal.read(2).await.expect("failed to read data 2");
1056 assert_eq!(item2, test_digest(2));
1057 let err = journal.read(3).await.expect_err("expected read to fail");
1058 assert!(matches!(err, Error::ItemOutOfRange(3)));
1059
1060 journal.sync().await.expect("failed to sync journal");
1062
1063 journal.prune(1).await.expect("failed to prune journal 1");
1065
1066 journal.prune(2).await.expect("failed to prune journal 2");
1068 assert_eq!(journal.bounds().await.start, 2);
1069
1070 let result0 = journal.read(0).await;
1072 assert!(matches!(result0, Err(Error::ItemPruned(0))));
1073 let result1 = journal.read(1).await;
1074 assert!(matches!(result1, Err(Error::ItemPruned(1))));
1075
1076 let result2 = journal.read(2).await.unwrap();
1078 assert_eq!(result2, test_digest(2));
1079
1080 for i in 3..10 {
1082 let pos = journal
1083 .append(&test_digest(i))
1084 .await
1085 .expect("failed to append data");
1086 assert_eq!(pos, i);
1087 }
1088
1089 journal.prune(0).await.expect("no-op pruning failed");
1091 assert_eq!(journal.inner.read().await.journal.oldest_section(), Some(1));
1092 assert_eq!(journal.inner.read().await.journal.newest_section(), Some(5));
1093 assert_eq!(journal.bounds().await.start, 2);
1094
1095 journal
1097 .prune(3 * cfg.items_per_blob.get())
1098 .await
1099 .expect("failed to prune journal 2");
1100 assert_eq!(journal.inner.read().await.journal.oldest_section(), Some(3));
1101 assert_eq!(journal.inner.read().await.journal.newest_section(), Some(5));
1102 assert_eq!(journal.bounds().await.start, 6);
1103
1104 journal
1106 .prune(10000)
1107 .await
1108 .expect("failed to max-prune journal");
1109 let size = journal.size().await;
1110 assert_eq!(size, 10);
1111 assert_eq!(journal.test_oldest_section().await, Some(5));
1112 assert_eq!(journal.test_newest_section().await, Some(5));
1113 let bounds = journal.bounds().await;
1116 assert!(bounds.is_empty());
1117 assert_eq!(bounds.start, size);
1119
1120 {
1122 let reader = journal.reader().await;
1123 let result = reader.replay(NZUsize!(1024), 0).await;
1124 assert!(matches!(result, Err(Error::ItemPruned(0))));
1125 }
1126
1127 {
1129 let reader = journal.reader().await;
1130 let res = reader.replay(NZUsize!(1024), 0).await;
1131 assert!(matches!(res, Err(Error::ItemPruned(_))));
1132
1133 let reader = journal.reader().await;
1134 let stream = reader
1135 .replay(NZUsize!(1024), journal.bounds().await.start)
1136 .await
1137 .expect("failed to replay journal from pruning boundary");
1138 pin_mut!(stream);
1139 let mut items = Vec::new();
1140 while let Some(result) = stream.next().await {
1141 match result {
1142 Ok((pos, item)) => {
1143 assert_eq!(test_digest(pos), item);
1144 items.push(pos);
1145 }
1146 Err(err) => panic!("Failed to read item: {err}"),
1147 }
1148 }
1149 assert_eq!(items, Vec::<u64>::new());
1150 }
1151
1152 journal.destroy().await.unwrap();
1153 });
1154 }
1155
1156 #[test_traced]
1158 fn test_fixed_journal_append_a_lot_of_data() {
1159 let executor = deterministic::Runner::default();
1161 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10000);
1162 executor.start(|context| async move {
1163 let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1164 let journal = Journal::init(context.with_label("first"), cfg.clone())
1165 .await
1166 .expect("failed to initialize journal");
1167 for i in 0u64..ITEMS_PER_BLOB.get() * 2 - 1 {
1169 journal
1170 .append(&test_digest(i))
1171 .await
1172 .expect("failed to append data");
1173 }
1174 journal.sync().await.expect("failed to sync journal");
1176 drop(journal);
1177 let journal = Journal::init(context.with_label("second"), cfg.clone())
1178 .await
1179 .expect("failed to re-initialize journal");
1180 for i in 0u64..10000 {
1181 let item: Digest = journal.read(i).await.expect("failed to read data");
1182 assert_eq!(item, test_digest(i));
1183 }
1184 journal.destroy().await.expect("failed to destroy journal");
1185 });
1186 }
1187
1188 #[test_traced]
1189 fn test_fixed_journal_replay() {
1190 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1191 let executor = deterministic::Runner::default();
1193
1194 executor.start(|context| async move {
1196 let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1198 let journal = Journal::init(context.with_label("first"), cfg.clone())
1199 .await
1200 .expect("failed to initialize journal");
1201
1202 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1204 let pos = journal
1205 .append(&test_digest(i))
1206 .await
1207 .expect("failed to append data");
1208 assert_eq!(pos, i);
1209 }
1210
1211 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1213 let item: Digest = journal.read(i).await.expect("failed to read data");
1214 assert_eq!(item, test_digest(i), "i={i}");
1215 }
1216
1217 {
1219 let reader = journal.reader().await;
1220 let stream = reader
1221 .replay(NZUsize!(1024), 0)
1222 .await
1223 .expect("failed to replay journal");
1224 let mut items = Vec::new();
1225 pin_mut!(stream);
1226 while let Some(result) = stream.next().await {
1227 match result {
1228 Ok((pos, item)) => {
1229 assert_eq!(test_digest(pos), item, "pos={pos}, item={item:?}");
1230 items.push(pos);
1231 }
1232 Err(err) => panic!("Failed to read item: {err}"),
1233 }
1234 }
1235
1236 assert_eq!(
1238 items.len(),
1239 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1240 );
1241 items.sort();
1242 for (i, pos) in items.iter().enumerate() {
1243 assert_eq!(i as u64, *pos);
1244 }
1245 }
1246
1247 journal.sync().await.expect("Failed to sync journal");
1248 drop(journal);
1249
1250 let (blob, _) = context
1252 .open(&blob_partition(&cfg), &40u64.to_be_bytes())
1253 .await
1254 .expect("Failed to open blob");
1255 let bad_bytes = 123456789u32;
1257 blob.write_at(1, bad_bytes.to_be_bytes().to_vec())
1258 .await
1259 .expect("Failed to write bad bytes");
1260 blob.sync().await.expect("Failed to sync blob");
1261
1262 let journal = Journal::init(context.with_label("second"), cfg.clone())
1264 .await
1265 .expect("Failed to re-initialize journal");
1266
1267 let err = journal
1269 .read(40 * ITEMS_PER_BLOB.get() + 1)
1270 .await
1271 .unwrap_err();
1272 assert!(matches!(err, Error::Runtime(_)));
1273
1274 {
1276 let mut error_found = false;
1277 let reader = journal.reader().await;
1278 let stream = reader
1279 .replay(NZUsize!(1024), 0)
1280 .await
1281 .expect("failed to replay journal");
1282 let mut items = Vec::new();
1283 pin_mut!(stream);
1284 while let Some(result) = stream.next().await {
1285 match result {
1286 Ok((pos, item)) => {
1287 assert_eq!(test_digest(pos), item);
1288 items.push(pos);
1289 }
1290 Err(err) => {
1291 error_found = true;
1292 assert!(matches!(err, Error::Runtime(_)));
1293 break;
1294 }
1295 }
1296 }
1297 assert!(error_found); }
1299 });
1300 }
1301
1302 #[test_traced]
1303 fn test_fixed_journal_init_with_corrupted_historical_blobs() {
1304 let executor = deterministic::Runner::default();
1306 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1308 executor.start(|context| async move {
1309 let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1311 let journal = Journal::init(context.with_label("first"), cfg.clone())
1312 .await
1313 .expect("failed to initialize journal");
1314
1315 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1317 let pos = journal
1318 .append(&test_digest(i))
1319 .await
1320 .expect("failed to append data");
1321 assert_eq!(pos, i);
1322 }
1323 journal.sync().await.expect("Failed to sync journal");
1324 drop(journal);
1325
1326 let (blob, size) = context
1331 .open(&blob_partition(&cfg), &40u64.to_be_bytes())
1332 .await
1333 .expect("Failed to open blob");
1334 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1335 blob.sync().await.expect("Failed to sync blob");
1336
1337 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1340 .await
1341 .expect("failed to initialize journal");
1342
1343 let expected_size = ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2;
1346 assert_eq!(journal.size().await, expected_size);
1347
1348 let reader = journal.reader().await;
1350 match reader.replay(NZUsize!(1024), 0).await {
1351 Err(Error::Corruption(msg)) => {
1352 assert!(
1353 msg.contains("section 40"),
1354 "Error should mention section 40, got: {msg}"
1355 );
1356 }
1357 Err(e) => panic!("Expected Corruption error for section 40, got: {:?}", e),
1358 Ok(_) => panic!("Expected replay to fail with corruption"),
1359 };
1360 });
1361 }
1362
1363 #[test_traced]
1364 fn test_fixed_journal_replay_with_missing_historical_blob() {
1365 let executor = deterministic::Runner::default();
1366 executor.start(|context| async move {
1367 let cfg = test_cfg(&context, NZU64!(2));
1368 let journal = Journal::init(context.with_label("first"), cfg.clone())
1369 .await
1370 .expect("failed to initialize journal");
1371 for i in 0u64..5 {
1372 journal
1373 .append(&test_digest(i))
1374 .await
1375 .expect("failed to append data");
1376 }
1377 journal.sync().await.expect("failed to sync journal");
1378 drop(journal);
1379
1380 context
1381 .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes()))
1382 .await
1383 .expect("failed to remove blob");
1384
1385 let result = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1387 .await
1388 .expect("init shouldn't fail");
1389
1390 let reader = result.reader().await;
1392 match reader.replay(NZUsize!(1024), 0).await {
1393 Err(Error::Corruption(_)) => {}
1394 Err(err) => panic!("expected Corruption, got: {err}"),
1395 Ok(_) => panic!("expected Corruption, got ok"),
1396 };
1397
1398 match result.read(2).await {
1400 Err(Error::Corruption(_)) => {}
1401 Err(err) => panic!("expected Corruption, got: {err}"),
1402 Ok(_) => panic!("expected Corruption, got ok"),
1403 };
1404 });
1405 }
1406
1407 #[test_traced]
1408 fn test_fixed_journal_test_trim_blob() {
1409 let executor = deterministic::Runner::default();
1411 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1413 executor.start(|context| async move {
1414 let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1416 let journal = Journal::init(context.with_label("first"), cfg.clone())
1417 .await
1418 .expect("failed to initialize journal");
1419
1420 let item_count = ITEMS_PER_BLOB.get() + 3;
1422 for i in 0u64..item_count {
1423 journal
1424 .append(&test_digest(i))
1425 .await
1426 .expect("failed to append data");
1427 }
1428 assert_eq!(journal.size().await, item_count);
1429 journal.sync().await.expect("Failed to sync journal");
1430 drop(journal);
1431
1432 let (blob, size) = context
1435 .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1436 .await
1437 .expect("Failed to open blob");
1438 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1439 blob.sync().await.expect("Failed to sync blob");
1440
1441 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1442 .await
1443 .unwrap();
1444
1445 assert_eq!(journal.size().await, item_count - 1);
1448
1449 journal.destroy().await.expect("Failed to destroy journal");
1451 });
1452 }
1453
1454 #[test_traced]
1455 fn test_fixed_journal_partial_replay() {
1456 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1457 const START_POS: u64 = 53;
1460
1461 let executor = deterministic::Runner::default();
1463 executor.start(|context| async move {
1465 let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1467 let journal = Journal::init(context.clone(), cfg.clone())
1468 .await
1469 .expect("failed to initialize journal");
1470
1471 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1473 let pos = journal
1474 .append(&test_digest(i))
1475 .await
1476 .expect("failed to append data");
1477 assert_eq!(pos, i);
1478 }
1479
1480 {
1482 let reader = journal.reader().await;
1483 let stream = reader
1484 .replay(NZUsize!(1024), START_POS)
1485 .await
1486 .expect("failed to replay journal");
1487 let mut items = Vec::new();
1488 pin_mut!(stream);
1489 while let Some(result) = stream.next().await {
1490 match result {
1491 Ok((pos, item)) => {
1492 assert!(pos >= START_POS, "pos={pos}, expected >= {START_POS}");
1493 assert_eq!(
1494 test_digest(pos),
1495 item,
1496 "Item at position {pos} did not match expected digest"
1497 );
1498 items.push(pos);
1499 }
1500 Err(err) => panic!("Failed to read item: {err}"),
1501 }
1502 }
1503
1504 assert_eq!(
1506 items.len(),
1507 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1508 - START_POS as usize
1509 );
1510 items.sort();
1511 for (i, pos) in items.iter().enumerate() {
1512 assert_eq!(i as u64, *pos - START_POS);
1513 }
1514 }
1515
1516 journal.destroy().await.unwrap();
1517 });
1518 }
1519
1520 #[test_traced]
1521 fn test_fixed_journal_recover_from_partial_write() {
1522 let executor = deterministic::Runner::default();
1524
1525 executor.start(|context| async move {
1527 let cfg = test_cfg(&context, NZU64!(3));
1529 let journal = Journal::init(context.with_label("first"), cfg.clone())
1530 .await
1531 .expect("failed to initialize journal");
1532 for i in 0..5 {
1533 journal
1534 .append(&test_digest(i))
1535 .await
1536 .expect("failed to append data");
1537 }
1538 assert_eq!(journal.size().await, 5);
1539 journal.sync().await.expect("Failed to sync journal");
1540 drop(journal);
1541
1542 let (blob, size) = context
1544 .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1545 .await
1546 .expect("Failed to open blob");
1547 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1549 blob.sync().await.expect("Failed to sync blob");
1550
1551 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1553 .await
1554 .expect("Failed to re-initialize journal");
1555 assert_eq!(journal.pruning_boundary().await, 0);
1557 assert_eq!(journal.size().await, 4);
1558 drop(journal);
1559
1560 context
1562 .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes()))
1563 .await
1564 .expect("Failed to remove blob");
1565
1566 let journal = Journal::<_, Digest>::init(context.with_label("third"), cfg.clone())
1567 .await
1568 .expect("Failed to re-initialize journal");
1569 assert_eq!(journal.size().await, 3);
1571
1572 journal.destroy().await.unwrap();
1573 });
1574 }
1575
1576 #[test_traced]
1577 fn test_fixed_journal_recover_detects_oldest_section_too_short() {
1578 let executor = deterministic::Runner::default();
1579 executor.start(|context| async move {
1580 let cfg = test_cfg(&context, NZU64!(5));
1581 let journal =
1582 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
1583 .await
1584 .expect("failed to initialize journal at size");
1585
1586 for i in 0..8u64 {
1588 journal
1589 .append(&test_digest(100 + i))
1590 .await
1591 .expect("failed to append data");
1592 }
1593 journal.sync().await.expect("failed to sync journal");
1594 assert_eq!(journal.pruning_boundary().await, 7);
1595 assert_eq!(journal.size().await, 15);
1596 drop(journal);
1597
1598 let (blob, size) = context
1600 .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1601 .await
1602 .expect("failed to open oldest blob");
1603 blob.resize(size - 1).await.expect("failed to corrupt blob");
1604 blob.sync().await.expect("failed to sync blob");
1605
1606 let result =
1607 Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
1608 assert!(matches!(result, Err(Error::Corruption(_))));
1609 });
1610 }
1611
1612 #[test_traced]
1613 fn test_fixed_journal_recover_to_empty_from_partial_write() {
1614 let executor = deterministic::Runner::default();
1615 executor.start(|context| async move {
1616 let cfg = test_cfg(&context, NZU64!(10));
1618 let journal = Journal::init(context.with_label("first"), cfg.clone())
1619 .await
1620 .expect("failed to initialize journal");
1621 journal
1623 .append(&test_digest(0))
1624 .await
1625 .expect("failed to append data");
1626 assert_eq!(journal.size().await, 1);
1627 journal.sync().await.expect("Failed to sync journal");
1628 drop(journal);
1629
1630 let (blob, size) = context
1632 .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1633 .await
1634 .expect("Failed to open blob");
1635 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1637 blob.sync().await.expect("Failed to sync blob");
1638
1639 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1641 .await
1642 .expect("Failed to re-initialize journal");
1643
1644 let bounds = journal.bounds().await;
1647 assert_eq!(bounds.end, 0);
1648 assert!(bounds.is_empty());
1649 journal
1651 .append(&test_digest(0))
1652 .await
1653 .expect("failed to append data");
1654 assert_eq!(journal.size().await, 1);
1655
1656 journal.destroy().await.unwrap();
1657 });
1658 }
1659
1660 #[test_traced("DEBUG")]
1661 fn test_fixed_journal_recover_from_unwritten_data() {
1662 let executor = deterministic::Runner::default();
1663 executor.start(|context| async move {
1664 let cfg = test_cfg(&context, NZU64!(10));
1666 let journal = Journal::init(context.with_label("first"), cfg.clone())
1667 .await
1668 .expect("failed to initialize journal");
1669
1670 journal
1672 .append(&test_digest(0))
1673 .await
1674 .expect("failed to append data");
1675 assert_eq!(journal.size().await, 1);
1676 journal.sync().await.expect("Failed to sync journal");
1677 drop(journal);
1678
1679 let (blob, size) = context
1682 .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1683 .await
1684 .expect("Failed to open blob");
1685 blob.write_at(size, vec![0u8; PAGE_SIZE.get() as usize * 3])
1686 .await
1687 .expect("Failed to extend blob");
1688 blob.sync().await.expect("Failed to sync blob");
1689
1690 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1692 .await
1693 .expect("Failed to re-initialize journal");
1694
1695 assert_eq!(journal.size().await, 1);
1698
1699 journal
1701 .append(&test_digest(1))
1702 .await
1703 .expect("failed to append data");
1704
1705 journal.destroy().await.unwrap();
1706 });
1707 }
1708
1709 #[test_traced]
1710 fn test_fixed_journal_rewinding() {
1711 let executor = deterministic::Runner::default();
1712 executor.start(|context| async move {
1713 let cfg = test_cfg(&context, NZU64!(2));
1715 let journal = Journal::init(context.with_label("first"), cfg.clone())
1716 .await
1717 .expect("failed to initialize journal");
1718 assert!(matches!(journal.rewind(0).await, Ok(())));
1719 assert!(matches!(
1720 journal.rewind(1).await,
1721 Err(Error::InvalidRewind(1))
1722 ));
1723
1724 journal
1726 .append(&test_digest(0))
1727 .await
1728 .expect("failed to append data 0");
1729 assert_eq!(journal.size().await, 1);
1730 assert!(matches!(journal.rewind(1).await, Ok(()))); assert!(matches!(journal.rewind(0).await, Ok(())));
1732 assert_eq!(journal.size().await, 0);
1733
1734 for i in 0..7 {
1736 let pos = journal
1737 .append(&test_digest(i))
1738 .await
1739 .expect("failed to append data");
1740 assert_eq!(pos, i);
1741 }
1742 assert_eq!(journal.size().await, 7);
1743
1744 assert!(matches!(journal.rewind(4).await, Ok(())));
1746 assert_eq!(journal.size().await, 4);
1747
1748 assert!(matches!(journal.rewind(0).await, Ok(())));
1750 assert_eq!(journal.size().await, 0);
1751
1752 for _ in 0..10 {
1754 for i in 0..100 {
1755 journal
1756 .append(&test_digest(i))
1757 .await
1758 .expect("failed to append data");
1759 }
1760 journal.rewind(journal.size().await - 49).await.unwrap();
1761 }
1762 const ITEMS_REMAINING: u64 = 10 * (100 - 49);
1763 assert_eq!(journal.size().await, ITEMS_REMAINING);
1764
1765 journal.sync().await.expect("Failed to sync journal");
1766 drop(journal);
1767
1768 let mut cfg = test_cfg(&context, NZU64!(3));
1770 cfg.partition = "test-partition-2".into();
1771 let journal = Journal::init(context.with_label("second"), cfg.clone())
1772 .await
1773 .expect("failed to initialize journal");
1774 for _ in 0..10 {
1775 for i in 0..100 {
1776 journal
1777 .append(&test_digest(i))
1778 .await
1779 .expect("failed to append data");
1780 }
1781 journal.rewind(journal.size().await - 49).await.unwrap();
1782 }
1783 assert_eq!(journal.size().await, ITEMS_REMAINING);
1784
1785 journal.sync().await.expect("Failed to sync journal");
1786 drop(journal);
1787
1788 let journal: Journal<_, Digest> =
1790 Journal::init(context.with_label("third"), cfg.clone())
1791 .await
1792 .expect("failed to re-initialize journal");
1793 assert_eq!(journal.size().await, 10 * (100 - 49));
1794
1795 journal.prune(300).await.expect("pruning failed");
1797 assert_eq!(journal.size().await, ITEMS_REMAINING);
1798 assert!(matches!(
1800 journal.rewind(299).await,
1801 Err(Error::InvalidRewind(299))
1802 ));
1803 assert!(matches!(journal.rewind(300).await, Ok(())));
1806 let bounds = journal.bounds().await;
1807 assert_eq!(bounds.end, 300);
1808 assert!(bounds.is_empty());
1809
1810 journal.destroy().await.unwrap();
1811 });
1812 }
1813
1814 #[test_traced]
1822 fn test_fixed_journal_recover_from_page_boundary_truncation() {
1823 let executor = deterministic::Runner::default();
1824 executor.start(|context: Context| async move {
1825 let cfg = test_cfg(&context, NZU64!(100));
1827 let journal = Journal::init(context.with_label("first"), cfg.clone())
1828 .await
1829 .expect("failed to initialize journal");
1830
1831 for i in 0u64..10 {
1839 journal
1840 .append(&test_digest(i))
1841 .await
1842 .expect("failed to append data");
1843 }
1844 assert_eq!(journal.size().await, 10);
1845 journal.sync().await.expect("Failed to sync journal");
1846 drop(journal);
1847
1848 let physical_page_size = PAGE_SIZE.get() as u64 + 12;
1851 let (blob, size) = context
1852 .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1853 .await
1854 .expect("Failed to open blob");
1855
1856 let full_pages = size / physical_page_size;
1858 assert!(full_pages >= 2, "need at least 2 pages for this test");
1859 let truncate_to = (full_pages - 1) * physical_page_size;
1860
1861 blob.resize(truncate_to)
1862 .await
1863 .expect("Failed to truncate blob");
1864 blob.sync().await.expect("Failed to sync blob");
1865
1866 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1868 .await
1869 .expect("Failed to re-initialize journal after page truncation");
1870
1871 let remaining_logical_bytes = (full_pages - 1) * PAGE_SIZE.get() as u64;
1876 let expected_items = remaining_logical_bytes / 32; assert_eq!(
1878 journal.size().await,
1879 expected_items,
1880 "Journal should recover to {} items after truncation",
1881 expected_items
1882 );
1883
1884 for i in 0..expected_items {
1886 let item = journal
1887 .read(i)
1888 .await
1889 .expect("failed to read recovered item");
1890 assert_eq!(item, test_digest(i), "item {} mismatch after recovery", i);
1891 }
1892
1893 journal.destroy().await.expect("Failed to destroy journal");
1894 });
1895 }
1896
1897 #[test_traced]
1903 fn test_single_item_per_blob() {
1904 let executor = deterministic::Runner::default();
1905 executor.start(|context| async move {
1906 let cfg = Config {
1907 partition: "single-item-per-blob".into(),
1908 items_per_blob: NZU64!(1),
1909 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1910 write_buffer: NZUsize!(2048),
1911 };
1912
1913 let journal = Journal::init(context.with_label("first"), cfg.clone())
1915 .await
1916 .expect("failed to initialize journal");
1917
1918 let bounds = journal.bounds().await;
1920 assert_eq!(bounds.end, 0);
1921 assert!(bounds.is_empty());
1922
1923 let pos = journal
1925 .append(&test_digest(0))
1926 .await
1927 .expect("failed to append");
1928 assert_eq!(pos, 0);
1929 assert_eq!(journal.size().await, 1);
1930
1931 journal.sync().await.expect("failed to sync");
1933
1934 let value = journal
1936 .read(journal.size().await - 1)
1937 .await
1938 .expect("failed to read");
1939 assert_eq!(value, test_digest(0));
1940
1941 for i in 1..10u64 {
1943 let pos = journal
1944 .append(&test_digest(i))
1945 .await
1946 .expect("failed to append");
1947 assert_eq!(pos, i);
1948 assert_eq!(journal.size().await, i + 1);
1949
1950 let value = journal
1952 .read(journal.size().await - 1)
1953 .await
1954 .expect("failed to read");
1955 assert_eq!(value, test_digest(i));
1956 }
1957
1958 for i in 0..10u64 {
1960 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
1961 }
1962
1963 journal.sync().await.expect("failed to sync");
1964
1965 journal.prune(5).await.expect("failed to prune");
1968
1969 assert_eq!(journal.size().await, 10);
1971
1972 assert_eq!(journal.bounds().await.start, 5);
1974
1975 let value = journal
1977 .read(journal.size().await - 1)
1978 .await
1979 .expect("failed to read");
1980 assert_eq!(value, test_digest(9));
1981
1982 for i in 0..5 {
1984 assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1985 }
1986
1987 for i in 5..10u64 {
1989 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
1990 }
1991
1992 for i in 10..15u64 {
1994 let pos = journal
1995 .append(&test_digest(i))
1996 .await
1997 .expect("failed to append");
1998 assert_eq!(pos, i);
1999
2000 let value = journal
2002 .read(journal.size().await - 1)
2003 .await
2004 .expect("failed to read");
2005 assert_eq!(value, test_digest(i));
2006 }
2007
2008 journal.sync().await.expect("failed to sync");
2009 drop(journal);
2010
2011 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2013 .await
2014 .expect("failed to re-initialize journal");
2015
2016 assert_eq!(journal.size().await, 15);
2018
2019 assert_eq!(journal.bounds().await.start, 5);
2021
2022 let value = journal
2024 .read(journal.size().await - 1)
2025 .await
2026 .expect("failed to read");
2027 assert_eq!(value, test_digest(14));
2028
2029 for i in 5..15u64 {
2031 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2032 }
2033
2034 journal.destroy().await.expect("failed to destroy journal");
2035
2036 let journal = Journal::init(context.with_label("third"), cfg.clone())
2039 .await
2040 .expect("failed to initialize journal");
2041
2042 for i in 0..10u64 {
2044 journal.append(&test_digest(i + 100)).await.unwrap();
2045 }
2046
2047 journal.prune(5).await.unwrap();
2049 let bounds = journal.bounds().await;
2050 assert_eq!(bounds.end, 10);
2051 assert_eq!(bounds.start, 5);
2052
2053 journal.sync().await.unwrap();
2055 drop(journal);
2056
2057 let journal = Journal::<_, Digest>::init(context.with_label("fourth"), cfg.clone())
2059 .await
2060 .expect("failed to re-initialize journal");
2061
2062 let bounds = journal.bounds().await;
2064 assert_eq!(bounds.end, 10);
2065 assert_eq!(bounds.start, 5);
2066
2067 let value = journal.read(journal.size().await - 1).await.unwrap();
2069 assert_eq!(value, test_digest(109));
2070
2071 for i in 5..10u64 {
2073 assert_eq!(journal.read(i).await.unwrap(), test_digest(i + 100));
2074 }
2075
2076 journal.destroy().await.expect("failed to destroy journal");
2077
2078 let journal = Journal::init(context.clone(), cfg.clone())
2080 .await
2081 .expect("failed to initialize journal");
2082
2083 for i in 0..5u64 {
2084 journal.append(&test_digest(i + 200)).await.unwrap();
2085 }
2086 journal.sync().await.unwrap();
2087
2088 journal.prune(5).await.unwrap();
2090 let bounds = journal.bounds().await;
2091 assert_eq!(bounds.end, 5); assert!(bounds.is_empty()); let result = journal.read(journal.size().await - 1).await;
2096 assert!(matches!(result, Err(Error::ItemPruned(4))));
2097
2098 journal.append(&test_digest(205)).await.unwrap();
2100 assert_eq!(journal.bounds().await.start, 5);
2101 assert_eq!(
2102 journal.read(journal.size().await - 1).await.unwrap(),
2103 test_digest(205)
2104 );
2105
2106 journal.destroy().await.expect("failed to destroy journal");
2107 });
2108 }
2109
2110 #[test_traced]
2111 fn test_fixed_journal_init_at_size_zero() {
2112 let executor = deterministic::Runner::default();
2113 executor.start(|context| async move {
2114 let cfg = test_cfg(&context, NZU64!(5));
2115 let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 0)
2116 .await
2117 .unwrap();
2118
2119 let bounds = journal.bounds().await;
2120 assert_eq!(bounds.end, 0);
2121 assert!(bounds.is_empty());
2122
2123 let pos = journal.append(&test_digest(100)).await.unwrap();
2125 assert_eq!(pos, 0);
2126 assert_eq!(journal.size().await, 1);
2127 assert_eq!(journal.read(0).await.unwrap(), test_digest(100));
2128
2129 journal.destroy().await.unwrap();
2130 });
2131 }
2132
2133 #[test_traced]
2134 fn test_fixed_journal_init_at_size_section_boundary() {
2135 let executor = deterministic::Runner::default();
2136 executor.start(|context| async move {
2137 let cfg = test_cfg(&context, NZU64!(5));
2138
2139 let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 10)
2141 .await
2142 .unwrap();
2143
2144 let bounds = journal.bounds().await;
2145 assert_eq!(bounds.end, 10);
2146 assert!(bounds.is_empty());
2147
2148 let pos = journal.append(&test_digest(1000)).await.unwrap();
2150 assert_eq!(pos, 10);
2151 assert_eq!(journal.size().await, 11);
2152 assert_eq!(journal.read(10).await.unwrap(), test_digest(1000));
2153
2154 let pos = journal.append(&test_digest(1001)).await.unwrap();
2156 assert_eq!(pos, 11);
2157 assert_eq!(journal.read(11).await.unwrap(), test_digest(1001));
2158
2159 journal.destroy().await.unwrap();
2160 });
2161 }
2162
2163 #[test_traced]
2164 fn test_fixed_journal_init_at_size_mid_section() {
2165 let executor = deterministic::Runner::default();
2166 executor.start(|context| async move {
2167 let cfg = test_cfg(&context, NZU64!(5));
2168
2169 let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 7)
2171 .await
2172 .unwrap();
2173
2174 let bounds = journal.bounds().await;
2175 assert_eq!(bounds.end, 7);
2176 assert!(bounds.is_empty());
2178
2179 assert!(matches!(journal.read(5).await, Err(Error::ItemPruned(5))));
2181 assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2182
2183 let pos = journal.append(&test_digest(700)).await.unwrap();
2185 assert_eq!(pos, 7);
2186 assert_eq!(journal.size().await, 8);
2187 assert_eq!(journal.read(7).await.unwrap(), test_digest(700));
2188 assert_eq!(journal.bounds().await.start, 7);
2190
2191 journal.destroy().await.unwrap();
2192 });
2193 }
2194
2195 #[test_traced]
2196 fn test_fixed_journal_init_at_size_persistence() {
2197 let executor = deterministic::Runner::default();
2198 executor.start(|context| async move {
2199 let cfg = test_cfg(&context, NZU64!(5));
2200
2201 let journal =
2203 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2204 .await
2205 .unwrap();
2206
2207 for i in 0..5u64 {
2209 let pos = journal.append(&test_digest(1500 + i)).await.unwrap();
2210 assert_eq!(pos, 15 + i);
2211 }
2212
2213 assert_eq!(journal.size().await, 20);
2214
2215 journal.sync().await.unwrap();
2217 drop(journal);
2218
2219 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2220 .await
2221 .unwrap();
2222
2223 let bounds = journal.bounds().await;
2225 assert_eq!(bounds.end, 20);
2226 assert_eq!(bounds.start, 15);
2227
2228 for i in 0..5u64 {
2230 assert_eq!(journal.read(15 + i).await.unwrap(), test_digest(1500 + i));
2231 }
2232
2233 let pos = journal.append(&test_digest(9999)).await.unwrap();
2235 assert_eq!(pos, 20);
2236 assert_eq!(journal.read(20).await.unwrap(), test_digest(9999));
2237
2238 journal.destroy().await.unwrap();
2239 });
2240 }
2241
2242 #[test_traced]
2243 fn test_fixed_journal_init_at_size_persistence_without_data() {
2244 let executor = deterministic::Runner::default();
2245 executor.start(|context| async move {
2246 let cfg = test_cfg(&context, NZU64!(5));
2247
2248 let journal =
2250 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 15)
2251 .await
2252 .unwrap();
2253
2254 let bounds = journal.bounds().await;
2255 assert_eq!(bounds.end, 15);
2256 assert!(bounds.is_empty());
2257
2258 drop(journal);
2260
2261 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2263 .await
2264 .unwrap();
2265
2266 let bounds = journal.bounds().await;
2267 assert_eq!(bounds.end, 15);
2268 assert!(bounds.is_empty());
2269
2270 let pos = journal.append(&test_digest(1500)).await.unwrap();
2272 assert_eq!(pos, 15);
2273 assert_eq!(journal.read(15).await.unwrap(), test_digest(1500));
2274
2275 journal.destroy().await.unwrap();
2276 });
2277 }
2278
2279 #[test_traced]
2280 fn test_fixed_journal_init_at_size_large_offset() {
2281 let executor = deterministic::Runner::default();
2282 executor.start(|context| async move {
2283 let cfg = test_cfg(&context, NZU64!(5));
2284
2285 let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 1000)
2287 .await
2288 .unwrap();
2289
2290 let bounds = journal.bounds().await;
2291 assert_eq!(bounds.end, 1000);
2292 assert!(bounds.is_empty());
2293
2294 let pos = journal.append(&test_digest(100000)).await.unwrap();
2296 assert_eq!(pos, 1000);
2297 assert_eq!(journal.read(1000).await.unwrap(), test_digest(100000));
2298
2299 journal.destroy().await.unwrap();
2300 });
2301 }
2302
2303 #[test_traced]
2304 fn test_fixed_journal_init_at_size_prune_and_append() {
2305 let executor = deterministic::Runner::default();
2306 executor.start(|context| async move {
2307 let cfg = test_cfg(&context, NZU64!(5));
2308
2309 let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 20)
2311 .await
2312 .unwrap();
2313
2314 for i in 0..10u64 {
2316 journal.append(&test_digest(2000 + i)).await.unwrap();
2317 }
2318
2319 assert_eq!(journal.size().await, 30);
2320
2321 journal.prune(25).await.unwrap();
2323
2324 let bounds = journal.bounds().await;
2325 assert_eq!(bounds.end, 30);
2326 assert_eq!(bounds.start, 25);
2327
2328 for i in 25..30u64 {
2330 assert_eq!(journal.read(i).await.unwrap(), test_digest(2000 + (i - 20)));
2331 }
2332
2333 let pos = journal.append(&test_digest(3000)).await.unwrap();
2335 assert_eq!(pos, 30);
2336
2337 journal.destroy().await.unwrap();
2338 });
2339 }
2340
2341 #[test_traced]
2342 fn test_fixed_journal_clear_to_size() {
2343 let executor = deterministic::Runner::default();
2344 executor.start(|context| async move {
2345 let cfg = test_cfg(&context, NZU64!(10));
2346 let journal = Journal::init(context.with_label("journal"), cfg.clone())
2347 .await
2348 .expect("failed to initialize journal");
2349
2350 for i in 0..25u64 {
2352 journal.append(&test_digest(i)).await.unwrap();
2353 }
2354 assert_eq!(journal.size().await, 25);
2355 journal.sync().await.unwrap();
2356
2357 journal.clear_to_size(100).await.unwrap();
2359 assert_eq!(journal.size().await, 100);
2360
2361 for i in 0..25 {
2363 assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
2364 }
2365
2366 drop(journal);
2368 let journal =
2369 Journal::<_, Digest>::init(context.with_label("journal_after_clear"), cfg.clone())
2370 .await
2371 .expect("failed to re-initialize journal after clear");
2372 assert_eq!(journal.size().await, 100);
2373
2374 for i in 100..105u64 {
2376 let pos = journal.append(&test_digest(i)).await.unwrap();
2377 assert_eq!(pos, i);
2378 }
2379 assert_eq!(journal.size().await, 105);
2380
2381 for i in 100..105u64 {
2383 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2384 }
2385
2386 journal.sync().await.unwrap();
2388 drop(journal);
2389
2390 let journal = Journal::<_, Digest>::init(context.with_label("journal_reopened"), cfg)
2391 .await
2392 .expect("failed to re-initialize journal");
2393
2394 assert_eq!(journal.size().await, 105);
2395 for i in 100..105u64 {
2396 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2397 }
2398
2399 journal.destroy().await.unwrap();
2400 });
2401 }
2402
2403 #[test_traced]
2404 fn test_fixed_journal_sync_crash_meta_none_boundary_aligned() {
2405 let executor = deterministic::Runner::default();
2407 executor.start(|context| async move {
2408 let cfg = test_cfg(&context, NZU64!(5));
2409 let journal = Journal::<_, Digest>::init(context.with_label("first"), cfg.clone())
2410 .await
2411 .unwrap();
2412
2413 for i in 0..5u64 {
2414 journal.append(&test_digest(i)).await.unwrap();
2415 }
2416 let inner = journal.inner.read().await;
2417 let tail_section = inner.size / journal.items_per_blob;
2418 inner.journal.sync(tail_section).await.unwrap();
2419 drop(inner);
2420 drop(journal);
2421
2422 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2423 .await
2424 .unwrap();
2425 let bounds = journal.bounds().await;
2426 assert_eq!(bounds.start, 0);
2427 assert_eq!(bounds.end, 5);
2428 journal.destroy().await.unwrap();
2429 });
2430 }
2431
2432 #[test_traced]
2433 fn test_fixed_journal_oldest_section_invalid_len() {
2434 let executor = deterministic::Runner::default();
2436 executor.start(|context| async move {
2437 let cfg = test_cfg(&context, NZU64!(5));
2438 let journal =
2439 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2440 .await
2441 .unwrap();
2442 for i in 0..3u64 {
2443 journal.append(&test_digest(i)).await.unwrap();
2444 }
2445 assert_eq!(journal.inner.read().await.journal.newest_section(), Some(2));
2446 journal.sync().await.unwrap();
2447
2448 let mut inner = journal.inner.write().await;
2450 inner.metadata.clear();
2451 inner.metadata.sync().await.unwrap();
2452 drop(inner);
2453 drop(journal);
2454
2455 let result =
2459 Journal::<_, Digest>::init(context.with_label("second"), cfg.clone()).await;
2460 assert!(matches!(result, Err(Error::Corruption(_))));
2461 context.remove(&blob_partition(&cfg), None).await.unwrap();
2462 context
2463 .remove(&format!("{}-metadata", cfg.partition), None)
2464 .await
2465 .unwrap();
2466 });
2467 }
2468
2469 #[test_traced]
2470 fn test_fixed_journal_sync_crash_meta_mid_boundary_unchanged() {
2471 let executor = deterministic::Runner::default();
2473 executor.start(|context| async move {
2474 let cfg = test_cfg(&context, NZU64!(5));
2475 let journal =
2476 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2477 .await
2478 .unwrap();
2479 for i in 0..3u64 {
2480 journal.append(&test_digest(i)).await.unwrap();
2481 }
2482 let inner = journal.inner.read().await;
2483 let tail_section = inner.size / journal.items_per_blob;
2484 inner.journal.sync(tail_section).await.unwrap();
2485 drop(inner);
2486 drop(journal);
2487
2488 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2489 .await
2490 .unwrap();
2491 let bounds = journal.bounds().await;
2492 assert_eq!(bounds.start, 7);
2493 assert_eq!(bounds.end, 10);
2494 journal.destroy().await.unwrap();
2495 });
2496 }
2497 #[test_traced]
2498 fn test_fixed_journal_sync_crash_meta_mid_to_aligned_becomes_stale() {
2499 let executor = deterministic::Runner::default();
2501 executor.start(|context| async move {
2502 let cfg = test_cfg(&context, NZU64!(5));
2503 let journal =
2504 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2505 .await
2506 .unwrap();
2507 for i in 0..10u64 {
2508 journal.append(&test_digest(i)).await.unwrap();
2509 }
2510 assert_eq!(journal.size().await, 17);
2511 journal.prune(10).await.unwrap();
2512
2513 let inner = journal.inner.read().await;
2514 let tail_section = inner.size / journal.items_per_blob;
2515 inner.journal.sync(tail_section).await.unwrap();
2516 drop(inner);
2517 drop(journal);
2518
2519 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
2520 .await
2521 .unwrap();
2522 let bounds = journal.bounds().await;
2523 assert_eq!(bounds.start, 10);
2524 assert_eq!(bounds.end, 17);
2525 journal.destroy().await.unwrap();
2526 });
2527 }
2528
2529 #[test_traced]
2530 fn test_fixed_journal_prune_does_not_move_boundary_backwards() {
2531 let executor = deterministic::Runner::default();
2534 executor.start(|context| async move {
2535 let cfg = test_cfg(&context, NZU64!(5));
2536 let journal =
2538 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2539 .await
2540 .unwrap();
2541 for i in 0..5u64 {
2543 journal.append(&test_digest(i)).await.unwrap();
2544 }
2545 journal.prune(5).await.unwrap();
2547 assert_eq!(journal.bounds().await.start, 7);
2548 journal.destroy().await.unwrap();
2549 });
2550 }
2551
2552 #[test_traced]
2553 fn test_fixed_journal_replay_after_init_at_size_spanning_sections() {
2554 let executor = deterministic::Runner::default();
2557 executor.start(|context| async move {
2558 let cfg = test_cfg(&context, NZU64!(5));
2559
2560 let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 7)
2563 .await
2564 .unwrap();
2565
2566 for i in 0..13u64 {
2568 let pos = journal.append(&test_digest(100 + i)).await.unwrap();
2569 assert_eq!(pos, 7 + i);
2570 }
2571 assert_eq!(journal.size().await, 20);
2572 journal.sync().await.unwrap();
2573
2574 {
2576 let reader = journal.reader().await;
2577 let stream = reader
2578 .replay(NZUsize!(1024), 7)
2579 .await
2580 .expect("failed to replay");
2581 pin_mut!(stream);
2582 let mut items: Vec<(u64, Digest)> = Vec::new();
2583 while let Some(result) = stream.next().await {
2584 items.push(result.expect("replay item failed"));
2585 }
2586
2587 assert_eq!(items.len(), 13);
2589 for (i, (pos, item)) in items.iter().enumerate() {
2590 assert_eq!(*pos, 7 + i as u64);
2591 assert_eq!(*item, test_digest(100 + i as u64));
2592 }
2593 }
2594
2595 {
2597 let reader = journal.reader().await;
2598 let stream = reader
2599 .replay(NZUsize!(1024), 12)
2600 .await
2601 .expect("failed to replay from mid-stream");
2602 pin_mut!(stream);
2603 let mut items: Vec<(u64, Digest)> = Vec::new();
2604 while let Some(result) = stream.next().await {
2605 items.push(result.expect("replay item failed"));
2606 }
2607
2608 assert_eq!(items.len(), 8);
2610 for (i, (pos, item)) in items.iter().enumerate() {
2611 assert_eq!(*pos, 12 + i as u64);
2612 assert_eq!(*item, test_digest(100 + 5 + i as u64));
2613 }
2614 }
2615
2616 journal.destroy().await.unwrap();
2617 });
2618 }
2619
2620 #[test_traced]
2621 fn test_fixed_journal_rewind_error_before_bounds_start() {
2622 let executor = deterministic::Runner::default();
2624 executor.start(|context| async move {
2625 let cfg = test_cfg(&context, NZU64!(5));
2626
2627 let journal = Journal::<_, Digest>::init_at_size(context.clone(), cfg.clone(), 10)
2628 .await
2629 .unwrap();
2630
2631 for i in 0..3u64 {
2633 journal.append(&test_digest(i)).await.unwrap();
2634 }
2635 assert_eq!(journal.size().await, 13);
2636
2637 journal.rewind(11).await.unwrap();
2639 assert_eq!(journal.size().await, 11);
2640
2641 journal.rewind(10).await.unwrap();
2643 assert_eq!(journal.size().await, 10);
2644
2645 let result = journal.rewind(9).await;
2647 assert!(matches!(result, Err(Error::InvalidRewind(9))));
2648
2649 journal.destroy().await.unwrap();
2650 });
2651 }
2652
2653 #[test_traced]
2654 fn test_fixed_journal_init_at_size_crash_scenarios() {
2655 let executor = deterministic::Runner::default();
2656 executor.start(|context| async move {
2657 let cfg = test_cfg(&context, NZU64!(5));
2658
2659 let journal =
2661 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 7)
2662 .await
2663 .unwrap();
2664 for i in 0..5u64 {
2665 journal.append(&test_digest(i)).await.unwrap();
2666 }
2667 journal.sync().await.unwrap();
2668 drop(journal);
2669
2670 let blob_part = blob_partition(&cfg);
2673 context.remove(&blob_part, None).await.unwrap();
2674
2675 let journal = Journal::<_, Digest>::init(context.with_label("crash1"), cfg.clone())
2677 .await
2678 .expect("init failed after clear crash");
2679 let bounds = journal.bounds().await;
2680 assert_eq!(bounds.end, 0);
2681 assert_eq!(bounds.start, 0);
2682 drop(journal);
2683
2684 let meta_cfg = MetadataConfig {
2686 partition: format!("{}-metadata", cfg.partition),
2687 codec_config: ((0..).into(), ()),
2688 };
2689 let mut metadata = Metadata::<_, u64, Vec<u8>>::init(
2690 context.with_label("restore_meta"),
2691 meta_cfg.clone(),
2692 )
2693 .await
2694 .unwrap();
2695 metadata.put(PRUNING_BOUNDARY_KEY, 7u64.to_be_bytes().to_vec());
2696 metadata.sync().await.unwrap();
2697
2698 let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap();
2711 blob.sync().await.unwrap(); let journal = Journal::<_, Digest>::init(context.with_label("crash2"), cfg.clone())
2715 .await
2716 .expect("init failed after create crash");
2717
2718 let bounds = journal.bounds().await;
2720 assert_eq!(bounds.start, 0);
2721 assert_eq!(bounds.end, 0);
2723 journal.destroy().await.unwrap();
2724 });
2725 }
2726
2727 #[test_traced]
2728 fn test_fixed_journal_clear_to_size_crash_scenarios() {
2729 let executor = deterministic::Runner::default();
2730 executor.start(|context| async move {
2731 let cfg = test_cfg(&context, NZU64!(5));
2732
2733 let journal =
2736 Journal::<_, Digest>::init_at_size(context.with_label("first"), cfg.clone(), 12)
2737 .await
2738 .unwrap();
2739 journal.sync().await.unwrap();
2740 drop(journal);
2741
2742 let blob_part = blob_partition(&cfg);
2747 context.remove(&blob_part, None).await.unwrap();
2748
2749 let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap();
2751 blob.sync().await.unwrap();
2752
2753 let journal =
2758 Journal::<_, Digest>::init(context.with_label("crash_clear"), cfg.clone())
2759 .await
2760 .expect("init failed after clear_to_size crash");
2761
2762 let bounds = journal.bounds().await;
2764 assert_eq!(bounds.start, 0);
2765 assert_eq!(bounds.end, 0);
2766 journal.destroy().await.unwrap();
2767 });
2768 }
2769}