1#[cfg(test)]
57use super::Reader as _;
58use crate::{
59 journal::{
60 contiguous::{metrics::FixedMetrics as Metrics, Many, Mutable},
61 segmented::fixed::{Config as SegmentedConfig, Journal as SegmentedJournal},
62 Error,
63 },
64 metadata::{Config as MetadataConfig, Metadata},
65 Context, Persistable,
66};
67use commonware_codec::CodecFixedShared;
68use commonware_runtime::buffer::paged::CacheRef;
69use commonware_utils::sync::{AsyncRwLockReadGuard, UpgradableAsyncRwLock};
70use futures::{stream::Stream, StreamExt};
71use std::num::{NonZeroU64, NonZeroUsize};
72use tracing::warn;
73
74const PRUNING_BOUNDARY_KEY: u64 = 1;
76
77#[derive(Clone)]
79pub struct Config {
80 pub partition: String,
85
86 pub items_per_blob: NonZeroU64,
91
92 pub page_cache: CacheRef,
94
95 pub write_buffer: NonZeroUsize,
97}
98
99struct Inner<E: Context, A: CodecFixedShared> {
101 journal: SegmentedJournal<E, A>,
103
104 size: u64,
106
107 metadata: Metadata<E, u64, Vec<u8>>,
115
116 pruning_boundary: u64,
118}
119
120impl<E: Context, A: CodecFixedShared> Inner<E, A> {
121 async fn read(&self, pos: u64, items_per_blob: u64) -> Result<A, Error> {
128 if pos >= self.size {
129 return Err(Error::ItemOutOfRange(pos));
130 }
131 if pos < self.pruning_boundary {
132 return Err(Error::ItemPruned(pos));
133 }
134
135 let section = pos / items_per_blob;
136 let section_start = section * items_per_blob;
137
138 let first_in_section = self.pruning_boundary.max(section_start);
141 let pos_in_section = pos - first_in_section;
142
143 self.journal
144 .get(section, pos_in_section)
145 .await
146 .map_err(|e| {
147 match e {
149 Error::SectionOutOfRange(e)
150 | Error::AlreadyPrunedToSection(e)
151 | Error::ItemOutOfRange(e) => {
152 Error::Corruption(format!("section/item should be found, but got: {e}"))
153 }
154 other => other,
155 }
156 })
157 }
158
159 fn try_read_sync(&self, pos: u64, items_per_blob: u64) -> Option<A> {
161 let mut buf = vec![0u8; SegmentedJournal::<E, A>::CHUNK_SIZE];
162 self.try_read_sync_into(pos, items_per_blob, &mut buf)
163 }
164
165 fn try_read_sync_into(&self, pos: u64, items_per_blob: u64, buf: &mut [u8]) -> Option<A> {
167 if pos >= self.size || pos < self.pruning_boundary {
168 return None;
169 }
170 let section = pos / items_per_blob;
171 let section_start = section * items_per_blob;
172 let first_in_section = self.pruning_boundary.max(section_start);
173 let pos_in_section = pos - first_in_section;
174 self.journal.try_get_sync_into(section, pos_in_section, buf)
175 }
176}
177
178pub struct Journal<E: Context, A: CodecFixedShared> {
193 inner: UpgradableAsyncRwLock<Inner<E, A>>,
198
199 items_per_blob: u64,
201
202 metrics: Metrics<E>,
204}
205
206pub struct Reader<'a, E: Context, A: CodecFixedShared> {
208 guard: AsyncRwLockReadGuard<'a, Inner<E, A>>,
209 items_per_blob: u64,
210 metrics: &'a Metrics<E>,
211}
212
213impl<E: Context, A: CodecFixedShared> super::Reader for Reader<'_, E, A> {
214 type Item = A;
215
216 fn bounds(&self) -> std::ops::Range<u64> {
217 self.guard.pruning_boundary..self.guard.size
218 }
219
220 async fn read(&self, pos: u64) -> Result<A, Error> {
221 let _timer = self.metrics.read_timer();
222 self.metrics.read_calls.inc();
223 let result = match self.guard.read(pos, self.items_per_blob).await {
224 Ok(item) => {
225 self.metrics.items_read.inc();
226 Ok(item)
227 }
228 Err(error) => Err(error),
229 };
230 result
231 }
232
233 async fn read_many(&self, positions: &[u64]) -> Result<Vec<A>, Error> {
234 if positions.is_empty() {
235 return Ok(Vec::new());
236 }
237 let _timer = self.metrics.read_many_timer();
238 self.metrics.read_many_calls.inc();
239 assert!(
240 positions.windows(2).all(|w| w[0] < w[1]),
241 "positions must be strictly increasing"
242 );
243 for &pos in positions {
245 if pos >= self.guard.size {
246 return Err(Error::ItemOutOfRange(pos));
247 }
248 if pos < self.guard.pruning_boundary {
249 return Err(Error::ItemPruned(pos));
250 }
251 }
252
253 let items_per_blob = self.items_per_blob;
254 let pruning_boundary = self.guard.pruning_boundary;
255 let chunk_size = SegmentedJournal::<E, A>::CHUNK_SIZE;
256
257 let mut result: Vec<Option<A>> = Vec::with_capacity(positions.len());
259 let mut miss_indices: Vec<usize> = Vec::new();
260 let mut miss_positions: Vec<u64> = Vec::new();
261
262 let mut sync_buf = vec![0u8; chunk_size];
263 for (i, &pos) in positions.iter().enumerate() {
264 if let Some(item) = self
265 .guard
266 .try_read_sync_into(pos, items_per_blob, &mut sync_buf)
267 {
268 result.push(Some(item));
269 } else {
270 result.push(None);
271 miss_indices.push(i);
272 miss_positions.push(pos);
273 }
274 }
275
276 if miss_positions.is_empty() {
277 self.metrics.record_cache_hits(positions.len() as u64);
278 self.metrics.items_read.inc_by(positions.len() as u64);
279 return Ok(result.into_iter().map(|r| r.unwrap()).collect());
280 }
281 self.metrics
282 .record_cache_hits((positions.len() - miss_positions.len()) as u64);
283 self.metrics
284 .record_cache_misses(miss_positions.len() as u64);
285
286 let mut reusable_buf = vec![0u8; miss_positions.len() * chunk_size];
288 let mut disk_offset = 0;
289
290 let mut group_start = 0;
291 while group_start < miss_positions.len() {
292 let section = miss_positions[group_start] / items_per_blob;
293 let section_start = section * items_per_blob;
294 let first_in_section = pruning_boundary.max(section_start);
295
296 let mut group_end = group_start + 1;
297 while group_end < miss_positions.len()
298 && miss_positions[group_end] / items_per_blob == section
299 {
300 group_end += 1;
301 }
302
303 let group_len = group_end - group_start;
304 let section_positions: Vec<u64> = miss_positions[group_start..group_end]
305 .iter()
306 .map(|&pos| pos - first_in_section)
307 .collect();
308
309 let buf = &mut reusable_buf[..group_len * chunk_size];
310 let items = self
311 .guard
312 .journal
313 .get_many(section, §ion_positions, buf)
314 .await
315 .map_err(|e| match e {
316 Error::SectionOutOfRange(e)
317 | Error::AlreadyPrunedToSection(e)
318 | Error::ItemOutOfRange(e) => {
319 Error::Corruption(format!("section/item should be found, but got: {e}"))
320 }
321 other => other,
322 })?;
323
324 for (item, &miss_idx) in items.into_iter().zip(&miss_indices[disk_offset..]) {
325 result[miss_idx] = Some(item);
326 }
327
328 disk_offset += group_len;
329 group_start = group_end;
330 }
331
332 self.metrics.items_read.inc_by(positions.len() as u64);
333 Ok(result.into_iter().map(|r| r.unwrap()).collect())
334 }
335
336 fn try_read_sync(&self, pos: u64) -> Option<A> {
337 self.guard
338 .try_read_sync(pos, self.items_per_blob)
339 .map_or_else(
340 || {
341 self.metrics.record_cache_misses(1);
342 None
343 },
344 |item| {
345 self.metrics.record_cache_hits(1);
346 self.metrics.try_read_sync_hits.inc();
347 self.metrics.items_read.inc();
348 Some(item)
349 },
350 )
351 }
352
353 async fn replay(
354 &self,
355 buffer: NonZeroUsize,
356 start_pos: u64,
357 ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + Send, Error> {
358 let items_per_blob = self.items_per_blob;
359 let pruning_boundary = self.guard.pruning_boundary;
360
361 if start_pos > self.guard.size {
363 return Err(Error::ItemOutOfRange(start_pos));
364 }
365 if start_pos < pruning_boundary {
366 return Err(Error::ItemPruned(start_pos));
367 }
368
369 let start_section = start_pos / items_per_blob;
370 let section_start = start_section * items_per_blob;
371
372 let first_in_section = pruning_boundary.max(section_start);
374 let start_pos_in_section = start_pos - first_in_section;
375
376 let journal = &self.guard.journal;
378 if let (Some(oldest), Some(newest)) = (journal.oldest_section(), journal.newest_section()) {
379 let first_to_check = start_section.max(oldest + 1);
380 for section in first_to_check..newest {
381 let len = journal.section_len(section).await?;
382 if len < items_per_blob {
383 return Err(Error::Corruption(format!(
384 "section {section} incomplete: expected {items_per_blob} items, got {len}"
385 )));
386 }
387 }
388 }
389
390 let inner_stream = journal
391 .replay(start_section, start_pos_in_section, buffer)
392 .await?;
393
394 let stream = inner_stream.map(move |result| {
396 result.map(|(section, pos_in_section, item)| {
397 let section_start = section * items_per_blob;
398 let first_in_section = pruning_boundary.max(section_start);
399 let global_pos = first_in_section + pos_in_section;
400 (global_pos, item)
401 })
402 });
403
404 Ok(stream)
405 }
406}
407
408impl<E: Context, A: CodecFixedShared> Journal<E, A> {
409 pub const CHUNK_SIZE: usize = SegmentedJournal::<E, A>::CHUNK_SIZE;
411
412 pub const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
414
415 async fn scan_partition(context: &E, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
417 match context.scan(partition).await {
418 Ok(blobs) => Ok(blobs),
419 Err(commonware_runtime::Error::PartitionMissing(_)) => Ok(Vec::new()),
420 Err(err) => Err(Error::Runtime(err)),
421 }
422 }
423
424 async fn select_blob_partition(context: &E, cfg: &Config) -> Result<String, Error> {
430 let legacy_partition = cfg.partition.as_str();
431 let new_partition = format!("{}-blobs", cfg.partition);
432
433 let legacy_blobs = Self::scan_partition(context, legacy_partition).await?;
434 let new_blobs = Self::scan_partition(context, &new_partition).await?;
435
436 if !legacy_blobs.is_empty() && !new_blobs.is_empty() {
437 return Err(Error::Corruption(format!(
438 "both legacy and blobs partitions contain data: legacy={} blobs={}",
439 legacy_partition, new_partition
440 )));
441 }
442
443 if !legacy_blobs.is_empty() {
444 Ok(legacy_partition.into())
445 } else {
446 Ok(new_partition)
447 }
448 }
449
450 pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
455 let items_per_blob = cfg.items_per_blob.get();
456
457 let blob_partition = Self::select_blob_partition(&context, &cfg).await?;
458 let segmented_cfg = SegmentedConfig {
459 partition: blob_partition,
460 page_cache: cfg.page_cache,
461 write_buffer: cfg.write_buffer,
462 };
463
464 let mut journal = SegmentedJournal::init(context.child("blobs"), segmented_cfg).await?;
465 let meta_cfg = MetadataConfig {
467 partition: format!("{}-metadata", cfg.partition),
468 codec_config: ((0..).into(), ()),
469 };
470
471 let mut metadata =
472 Metadata::<_, u64, Vec<u8>>::init(context.child("meta"), meta_cfg).await?;
473
474 let meta_pruning_boundary = match metadata.get(&PRUNING_BOUNDARY_KEY) {
476 Some(bytes) => Some(u64::from_be_bytes(bytes.as_slice().try_into().map_err(
477 |_| Error::Corruption("invalid pruning_boundary metadata".into()),
478 )?)),
479 None => None,
480 };
481
482 let (pruning_boundary, size, needs_metadata_update) =
484 Self::recover_bounds(&journal, items_per_blob, meta_pruning_boundary).await?;
485
486 if needs_metadata_update {
488 if pruning_boundary.is_multiple_of(items_per_blob) {
489 metadata.remove(&PRUNING_BOUNDARY_KEY);
490 metadata.sync().await?;
491 } else {
492 metadata
493 .put_sync(
494 PRUNING_BOUNDARY_KEY,
495 pruning_boundary.to_be_bytes().to_vec(),
496 )
497 .await?;
498 }
499 }
500
501 let tail_section = size / items_per_blob;
505 journal.ensure_section_exists(tail_section).await?;
506
507 let metrics = Metrics::new(context);
508 metrics.update(size, pruning_boundary, items_per_blob);
509
510 Ok(Self {
511 inner: UpgradableAsyncRwLock::new(Inner {
512 journal,
513 size,
514 metadata,
515 pruning_boundary,
516 }),
517 items_per_blob,
518 metrics,
519 })
520 }
521
522 async fn recover_bounds(
533 inner: &SegmentedJournal<E, A>,
534 items_per_blob: u64,
535 meta_pruning_boundary: Option<u64>,
536 ) -> Result<(u64, u64, bool), Error> {
537 let blob_boundary = inner.oldest_section().map_or(0, |o| o * items_per_blob);
539
540 let (pruning_boundary, needs_update) = match meta_pruning_boundary {
541 Some(meta_pruning_boundary)
543 if !meta_pruning_boundary.is_multiple_of(items_per_blob) =>
544 {
545 let meta_oldest_section = meta_pruning_boundary / items_per_blob;
546 match inner.oldest_section() {
547 None => {
548 warn!(
552 meta_oldest_section,
553 "crash repair: no blobs exist, ignoring stale metadata"
554 );
555 (blob_boundary, true)
556 }
557 Some(oldest_section) if meta_oldest_section < oldest_section => {
558 warn!(
559 meta_oldest_section,
560 oldest_section, "crash repair: metadata stale, computing from blobs"
561 );
562 (blob_boundary, true)
563 }
564 Some(oldest_section) if meta_oldest_section > oldest_section => {
565 warn!(
569 meta_oldest_section,
570 oldest_section,
571 "crash repair: metadata ahead of blobs, computing from blobs"
572 );
573 (blob_boundary, true)
574 }
575 Some(_) => (meta_pruning_boundary, false), }
577 }
578 Some(_) => (blob_boundary, true),
580 None => (blob_boundary, false),
582 };
583
584 Self::validate_oldest_section(inner, items_per_blob, pruning_boundary).await?;
586
587 let size = Self::compute_size(inner, items_per_blob, pruning_boundary).await?;
588 Ok((pruning_boundary, size, needs_update))
589 }
590
591 async fn validate_oldest_section(
596 inner: &SegmentedJournal<E, A>,
597 items_per_blob: u64,
598 pruning_boundary: u64,
599 ) -> Result<(), Error> {
600 let (Some(oldest), Some(newest)) = (inner.oldest_section(), inner.newest_section()) else {
601 return Ok(()); };
603
604 if oldest == newest {
605 return Ok(()); }
607
608 let oldest_len = inner.section_len(oldest).await?;
609 let oldest_start = oldest * items_per_blob;
610
611 let expected = if pruning_boundary > oldest_start {
612 items_per_blob - (pruning_boundary - oldest_start)
614 } else {
615 items_per_blob
617 };
618
619 if oldest_len != expected {
620 return Err(Error::Corruption(format!(
621 "oldest section {oldest} has wrong size: expected {expected} items, got {oldest_len}"
622 )));
623 }
624
625 Ok(())
626 }
627
628 async fn compute_size(
630 inner: &SegmentedJournal<E, A>,
631 items_per_blob: u64,
632 pruning_boundary: u64,
633 ) -> Result<u64, Error> {
634 let oldest = inner.oldest_section();
635 let newest = inner.newest_section();
636
637 let (Some(oldest), Some(newest)) = (oldest, newest) else {
638 return Ok(pruning_boundary);
639 };
640
641 if oldest == newest {
642 let tail_len = inner.section_len(newest).await?;
644 return Ok(pruning_boundary + tail_len);
645 }
646
647 let oldest_len = inner.section_len(oldest).await?;
649 let tail_len = inner.section_len(newest).await?;
650
651 let middle_sections = newest - oldest - 1;
653 let middle_items = middle_sections * items_per_blob;
654
655 Ok(pruning_boundary + oldest_len + middle_items + tail_len)
656 }
657
658 #[commonware_macros::stability(ALPHA)]
683 pub async fn init_at_size(context: E, cfg: Config, size: u64) -> Result<Self, Error> {
684 let items_per_blob = cfg.items_per_blob.get();
685 let tail_section = size / items_per_blob;
686
687 let blob_partition = Self::select_blob_partition(&context, &cfg).await?;
688 let segmented_cfg = SegmentedConfig {
689 partition: blob_partition,
690 page_cache: cfg.page_cache,
691 write_buffer: cfg.write_buffer,
692 };
693
694 let meta_cfg = MetadataConfig {
696 partition: format!("{}-metadata", cfg.partition),
697 codec_config: ((0..).into(), ()),
698 };
699 let mut metadata =
700 Metadata::<_, u64, Vec<u8>>::init(context.child("meta"), meta_cfg).await?;
701 let mut journal = SegmentedJournal::init(context.child("blobs"), segmented_cfg).await?;
702
703 journal.clear().await?;
709 journal.ensure_section_exists(tail_section).await?;
710
711 if !size.is_multiple_of(items_per_blob) {
713 metadata
714 .put_sync(PRUNING_BOUNDARY_KEY, size.to_be_bytes().to_vec())
715 .await?;
716 } else if metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
717 metadata.remove(&PRUNING_BOUNDARY_KEY);
718 metadata.sync().await?;
719 }
720
721 let metrics = Metrics::new(context);
722 metrics.update(size, size, items_per_blob);
723
724 Ok(Self {
725 inner: UpgradableAsyncRwLock::new(Inner {
726 journal,
727 size,
728 metadata,
729 pruning_boundary: size, }),
731 items_per_blob,
732 metrics,
733 })
734 }
735
736 #[inline]
738 const fn position_to_section(&self, position: u64) -> (u64, u64) {
739 let section = position / self.items_per_blob;
740 let pos_in_section = position % self.items_per_blob;
741 (section, pos_in_section)
742 }
743
744 pub async fn sync(&self) -> Result<(), Error> {
749 let _timer = self.metrics.sync_timer();
750 self.metrics.sync_calls.inc();
751 let inner = self.inner.upgradable_read().await;
754
755 let tail_section = inner.size / self.items_per_blob;
757
758 inner.journal.sync(tail_section).await?;
761
762 let pruning_boundary = inner.pruning_boundary;
764 let pruning_boundary_from_metadata = inner.metadata.get(&PRUNING_BOUNDARY_KEY).cloned();
765 let put = if !pruning_boundary.is_multiple_of(self.items_per_blob) {
766 let needs_update = pruning_boundary_from_metadata
767 .is_none_or(|bytes| bytes.as_slice() != pruning_boundary.to_be_bytes());
768
769 if needs_update {
770 true
771 } else {
772 return Ok(());
773 }
774 } else if pruning_boundary_from_metadata.is_some() {
775 false
776 } else {
777 return Ok(());
778 };
779
780 let mut inner = inner.upgrade().await;
783 if put {
784 inner.metadata.put(
785 PRUNING_BOUNDARY_KEY,
786 pruning_boundary.to_be_bytes().to_vec(),
787 );
788 } else {
789 inner.metadata.remove(&PRUNING_BOUNDARY_KEY);
790 }
791 let inner = inner.downgrade_to_upgradable();
792 inner.metadata.sync().await?;
793
794 Ok(())
795 }
796
797 pub async fn reader(&self) -> Reader<'_, E, A> {
799 Reader {
800 guard: self.inner.read().await,
801 items_per_blob: self.items_per_blob,
802 metrics: &self.metrics,
803 }
804 }
805
806 pub async fn size(&self) -> u64 {
809 self.inner.read().await.size
810 }
811
812 pub async fn append(&self, item: &A) -> Result<u64, Error> {
815 let _timer = self.metrics.append_timer();
816 self.metrics.append_calls.inc();
817 self.append_many_inner(Many::Flat(std::slice::from_ref(item)))
818 .await
819 }
820
821 pub async fn append_many<'a>(&'a self, items: Many<'a, A>) -> Result<u64, Error> {
826 let _timer = self.metrics.append_many_timer();
827 self.metrics.append_many_calls.inc();
828 self.append_many_inner(items).await
829 }
830
831 async fn append_many_inner<'a>(&'a self, items: Many<'a, A>) -> Result<u64, Error> {
833 if items.is_empty() {
834 return Err(Error::EmptyAppend);
835 }
836
837 let items_count = match &items {
840 Many::Flat(items) => items.len(),
841 Many::Nested(nested_items) => nested_items.iter().map(|s| s.len()).sum(),
842 };
843 let mut items_buf = Vec::with_capacity(items_count * A::SIZE);
844 match &items {
845 Many::Flat(items) => {
846 for item in *items {
847 item.write(&mut items_buf);
848 }
849 }
850 Many::Nested(nested_items) => {
851 for items in *nested_items {
852 for item in *items {
853 item.write(&mut items_buf);
854 }
855 }
856 }
857 }
858
859 let mut inner = self.inner.write().await;
861 let mut written = 0;
862 while written < items_count {
863 let (section, pos_in_section) = self.position_to_section(inner.size);
864 let remaining_space = (self.items_per_blob - pos_in_section) as usize;
865 let batch_count = remaining_space.min(items_count - written);
866 let start = written * A::SIZE;
867 let end = start + batch_count * A::SIZE;
868
869 inner
870 .journal
871 .append_raw(section, &items_buf[start..end])
872 .await?;
873 inner.size += batch_count as u64;
874 written += batch_count;
875
876 if inner.size.is_multiple_of(self.items_per_blob) {
877 let inner_ref = inner.downgrade_to_upgradable();
881 inner_ref.journal.sync(section).await?;
882 inner = inner_ref.upgrade().await;
883 inner.journal.ensure_section_exists(section + 1).await?;
884 }
885 }
886
887 self.metrics
888 .update(inner.size, inner.pruning_boundary, self.items_per_blob);
889 Ok(inner.size - 1)
890 }
891
892 pub async fn rewind(&self, size: u64) -> Result<(), Error> {
901 let mut inner = self.inner.write().await;
902
903 match size.cmp(&inner.size) {
904 std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
905 std::cmp::Ordering::Equal => return Ok(()),
906 std::cmp::Ordering::Less => {}
907 }
908
909 if size < inner.pruning_boundary {
910 return Err(Error::InvalidRewind(size));
911 }
912
913 let section = size / self.items_per_blob;
914 let section_start = section * self.items_per_blob;
915
916 let first_in_section = inner.pruning_boundary.max(section_start);
918 let pos_in_section = size - first_in_section;
919 let byte_offset = pos_in_section * Self::CHUNK_SIZE_U64;
920
921 inner.journal.rewind(section, byte_offset).await?;
922 inner.size = size;
923 self.metrics
924 .update(inner.size, inner.pruning_boundary, self.items_per_blob);
925
926 Ok(())
927 }
928
929 pub async fn pruning_boundary(&self) -> u64 {
931 let inner = self.inner.read().await;
932 inner.pruning_boundary
933 }
934
935 pub async fn prune(&self, min_item_pos: u64) -> Result<bool, Error> {
942 let mut inner = self.inner.write().await;
943
944 let target_section = min_item_pos / self.items_per_blob;
946
947 let tail_section = inner.size / self.items_per_blob;
949
950 let min_section = std::cmp::min(target_section, tail_section);
952
953 let pruned = inner.journal.prune(min_section).await?;
954
955 if pruned {
957 let new_oldest = inner
958 .journal
959 .oldest_section()
960 .expect("all sections pruned - violates tail section invariant");
961 assert!(inner.pruning_boundary < new_oldest * self.items_per_blob);
963 inner.pruning_boundary = new_oldest * self.items_per_blob;
964 self.metrics
965 .update(inner.size, inner.pruning_boundary, self.items_per_blob);
966 }
967
968 Ok(pruned)
969 }
970
971 pub async fn destroy(self) -> Result<(), Error> {
973 let inner = self.inner.into_inner();
975 inner.journal.destroy().await?;
976
977 inner.metadata.destroy().await?;
979
980 Ok(())
981 }
982
983 pub(crate) async fn clear_to_size(&self, new_size: u64) -> Result<(), Error> {
992 let mut inner = self.inner.write().await;
998 inner.journal.clear().await?;
999 let tail_section = new_size / self.items_per_blob;
1000 inner.journal.ensure_section_exists(tail_section).await?;
1001
1002 inner.size = new_size;
1003 inner.pruning_boundary = new_size; if !inner.pruning_boundary.is_multiple_of(self.items_per_blob) {
1007 let value = inner.pruning_boundary.to_be_bytes().to_vec();
1008 inner.metadata.put_sync(PRUNING_BOUNDARY_KEY, value).await?;
1009 } else if inner.metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
1010 inner.metadata.remove(&PRUNING_BOUNDARY_KEY);
1011 inner.metadata.sync().await?;
1012 }
1013
1014 self.metrics
1015 .update(inner.size, inner.pruning_boundary, self.items_per_blob);
1016 Ok(())
1017 }
1018
1019 #[cfg(test)]
1021 pub(crate) async fn read(&self, pos: u64) -> Result<A, Error> {
1022 self.reader().await.read(pos).await
1023 }
1024
1025 #[cfg(test)]
1027 pub(crate) async fn bounds(&self) -> std::ops::Range<u64> {
1028 self.reader().await.bounds()
1029 }
1030
1031 #[cfg(test)]
1033 pub(crate) async fn test_oldest_section(&self) -> Option<u64> {
1034 let inner = self.inner.read().await;
1035 inner.journal.oldest_section()
1036 }
1037
1038 #[cfg(test)]
1040 pub(crate) async fn test_newest_section(&self) -> Option<u64> {
1041 let inner = self.inner.read().await;
1042 inner.journal.newest_section()
1043 }
1044}
1045
1046impl<E: Context, A: CodecFixedShared> super::Contiguous for Journal<E, A> {
1048 type Item = A;
1049
1050 async fn reader(&self) -> impl super::Reader<Item = A> + '_ {
1051 Self::reader(self).await
1052 }
1053
1054 async fn size(&self) -> u64 {
1055 Self::size(self).await
1056 }
1057}
1058
1059impl<E: Context, A: CodecFixedShared> Mutable for Journal<E, A> {
1060 async fn append(&mut self, item: &Self::Item) -> Result<u64, Error> {
1061 Self::append(self, item).await
1062 }
1063
1064 async fn append_many<'a>(&'a mut self, items: Many<'a, Self::Item>) -> Result<u64, Error> {
1065 Self::append_many(self, items).await
1066 }
1067
1068 async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
1069 Self::prune(self, min_position).await
1070 }
1071
1072 async fn rewind(&mut self, size: u64) -> Result<(), Error> {
1073 Self::rewind(self, size).await
1074 }
1075}
1076
1077impl<E: Context, A: CodecFixedShared> Persistable for Journal<E, A> {
1078 type Error = Error;
1079
1080 async fn commit(&self) -> Result<(), Error> {
1081 self.sync().await
1082 }
1083
1084 async fn sync(&self) -> Result<(), Error> {
1085 self.sync().await
1086 }
1087
1088 async fn destroy(self) -> Result<(), Error> {
1089 self.destroy().await
1090 }
1091}
1092
1093#[commonware_macros::stability(ALPHA)]
1094impl<E: Context, A: CodecFixedShared> crate::journal::authenticated::Inner<E> for Journal<E, A> {
1095 type Config = Config;
1096
1097 async fn init<
1098 F: crate::merkle::Family,
1099 H: commonware_cryptography::Hasher,
1100 S: commonware_parallel::Strategy,
1101 >(
1102 context: E,
1103 merkle_cfg: crate::merkle::full::Config<S>,
1104 journal_cfg: Self::Config,
1105 rewind_predicate: fn(&A) -> bool,
1106 bagging: crate::merkle::Bagging,
1107 ) -> Result<
1108 crate::journal::authenticated::Journal<F, E, Self, H, S>,
1109 crate::journal::authenticated::Error<F>,
1110 > {
1111 crate::journal::authenticated::Journal::<F, E, Self, H, S>::new(
1112 context,
1113 merkle_cfg,
1114 journal_cfg,
1115 rewind_predicate,
1116 bagging,
1117 )
1118 .await
1119 }
1120}
1121
1122#[cfg(test)]
1123mod tests {
1124 use super::*;
1125 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
1126 use commonware_macros::test_traced;
1127 use commonware_runtime::{
1128 deterministic::{self, Context},
1129 Blob, BufferPooler, Error as RuntimeError, Metrics as _, Runner, Storage, Supervisor as _,
1130 };
1131 use commonware_utils::{NZUsize, NZU16, NZU64};
1132 use futures::{pin_mut, StreamExt};
1133 use std::num::NonZeroU16;
1134
1135 const PAGE_SIZE: NonZeroU16 = NZU16!(44);
1136 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
1137
1138 fn test_digest(value: u64) -> Digest {
1140 Sha256::hash(&value.to_be_bytes())
1141 }
1142
1143 fn test_cfg(pooler: &impl BufferPooler, items_per_blob: NonZeroU64) -> Config {
1144 Config {
1145 partition: "test-partition".into(),
1146 items_per_blob,
1147 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
1148 write_buffer: NZUsize!(2048),
1149 }
1150 }
1151
1152 fn blob_partition(cfg: &Config) -> String {
1153 format!("{}-blobs", cfg.partition)
1154 }
1155
1156 async fn scan_partition(context: &Context, partition: &str) -> Vec<Vec<u8>> {
1157 match context.scan(partition).await {
1158 Ok(blobs) => blobs,
1159 Err(RuntimeError::PartitionMissing(_)) => Vec::new(),
1160 Err(err) => panic!("Failed to scan partition {partition}: {err}"),
1161 }
1162 }
1163
1164 #[test_traced]
1165 fn test_fixed_journal_init_conflicting_partitions() {
1166 let executor = deterministic::Runner::default();
1167 executor.start(|context| async move {
1168 let cfg = test_cfg(&context, NZU64!(2));
1169 let legacy_partition = cfg.partition.clone();
1170 let blobs_partition = blob_partition(&cfg);
1171
1172 let (legacy_blob, _) = context
1173 .open(&legacy_partition, &0u64.to_be_bytes())
1174 .await
1175 .expect("Failed to open legacy blob");
1176 legacy_blob
1177 .write_at_sync(0, vec![0u8; 1])
1178 .await
1179 .expect("Failed to write legacy blob");
1180
1181 let (new_blob, _) = context
1182 .open(&blobs_partition, &0u64.to_be_bytes())
1183 .await
1184 .expect("Failed to open new blob");
1185 new_blob
1186 .write_at_sync(0, vec![0u8; 1])
1187 .await
1188 .expect("Failed to write new blob");
1189
1190 let result = Journal::<_, Digest>::init(context.child("second"), cfg.clone()).await;
1191 assert!(matches!(result, Err(Error::Corruption(_))));
1192 });
1193 }
1194
1195 #[test_traced]
1196 fn test_fixed_journal_init_prefers_legacy_partition() {
1197 let executor = deterministic::Runner::default();
1198 executor.start(|context| async move {
1199 let cfg = test_cfg(&context, NZU64!(2));
1200 let legacy_partition = cfg.partition.clone();
1201 let blobs_partition = blob_partition(&cfg);
1202
1203 let (legacy_blob, _) = context
1205 .open(&legacy_partition, &0u64.to_be_bytes())
1206 .await
1207 .expect("Failed to open legacy blob");
1208 legacy_blob
1209 .write_at_sync(0, vec![0u8; 1])
1210 .await
1211 .expect("Failed to write legacy blob");
1212
1213 let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone())
1214 .await
1215 .expect("failed to initialize journal");
1216 journal.append(&test_digest(1)).await.unwrap();
1217 journal.sync().await.unwrap();
1218 drop(journal);
1219
1220 let legacy_blobs = scan_partition(&context, &legacy_partition).await;
1221 let new_blobs = scan_partition(&context, &blobs_partition).await;
1222 assert!(!legacy_blobs.is_empty());
1223 assert!(new_blobs.is_empty());
1224 });
1225 }
1226
1227 #[test_traced]
1228 fn test_fixed_journal_init_defaults_to_blobs_partition() {
1229 let executor = deterministic::Runner::default();
1230 executor.start(|context| async move {
1231 let cfg = test_cfg(&context, NZU64!(2));
1232 let legacy_partition = cfg.partition.clone();
1233 let blobs_partition = blob_partition(&cfg);
1234
1235 let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone())
1236 .await
1237 .expect("failed to initialize journal");
1238 journal.append(&test_digest(1)).await.unwrap();
1239 journal.sync().await.unwrap();
1240 drop(journal);
1241
1242 let legacy_blobs = scan_partition(&context, &legacy_partition).await;
1243 let new_blobs = scan_partition(&context, &blobs_partition).await;
1244 assert!(legacy_blobs.is_empty());
1245 assert!(!new_blobs.is_empty());
1246 });
1247 }
1248
1249 #[test_traced]
1250 fn test_fixed_journal_append_and_prune() {
1251 let executor = deterministic::Runner::default();
1253
1254 executor.start(|context| async move {
1256 let cfg = test_cfg(&context, NZU64!(2));
1258 let journal = Journal::init(context.child("first"), cfg.clone())
1259 .await
1260 .expect("failed to initialize journal");
1261
1262 let mut pos = journal
1264 .append(&test_digest(0))
1265 .await
1266 .expect("failed to append data 0");
1267 assert_eq!(pos, 0);
1268
1269 journal.sync().await.expect("Failed to sync journal");
1271 drop(journal);
1272
1273 let cfg = test_cfg(&context, NZU64!(2));
1274 let journal = Journal::init(context.child("second"), cfg.clone())
1275 .await
1276 .expect("failed to re-initialize journal");
1277 assert_eq!(journal.size().await, 1);
1278
1279 pos = journal
1281 .append(&test_digest(1))
1282 .await
1283 .expect("failed to append data 1");
1284 assert_eq!(pos, 1);
1285 pos = journal
1286 .append(&test_digest(2))
1287 .await
1288 .expect("failed to append data 2");
1289 assert_eq!(pos, 2);
1290
1291 let item0 = journal.read(0).await.expect("failed to read data 0");
1293 assert_eq!(item0, test_digest(0));
1294 let item1 = journal.read(1).await.expect("failed to read data 1");
1295 assert_eq!(item1, test_digest(1));
1296 let item2 = journal.read(2).await.expect("failed to read data 2");
1297 assert_eq!(item2, test_digest(2));
1298 let err = journal.read(3).await.expect_err("expected read to fail");
1299 assert!(matches!(err, Error::ItemOutOfRange(3)));
1300
1301 journal.sync().await.expect("failed to sync journal");
1303
1304 journal.prune(1).await.expect("failed to prune journal 1");
1306
1307 journal.prune(2).await.expect("failed to prune journal 2");
1309 assert_eq!(journal.bounds().await.start, 2);
1310
1311 let result0 = journal.read(0).await;
1313 assert!(matches!(result0, Err(Error::ItemPruned(0))));
1314 let result1 = journal.read(1).await;
1315 assert!(matches!(result1, Err(Error::ItemPruned(1))));
1316
1317 let result2 = journal.read(2).await.unwrap();
1319 assert_eq!(result2, test_digest(2));
1320
1321 for i in 3..10 {
1323 let pos = journal
1324 .append(&test_digest(i))
1325 .await
1326 .expect("failed to append data");
1327 assert_eq!(pos, i);
1328 }
1329
1330 journal.prune(0).await.expect("no-op pruning failed");
1332 assert_eq!(journal.inner.read().await.journal.oldest_section(), Some(1));
1333 assert_eq!(journal.inner.read().await.journal.newest_section(), Some(5));
1334 assert_eq!(journal.bounds().await.start, 2);
1335
1336 journal
1338 .prune(3 * cfg.items_per_blob.get())
1339 .await
1340 .expect("failed to prune journal 2");
1341 assert_eq!(journal.inner.read().await.journal.oldest_section(), Some(3));
1342 assert_eq!(journal.inner.read().await.journal.newest_section(), Some(5));
1343 assert_eq!(journal.bounds().await.start, 6);
1344
1345 journal
1347 .prune(10000)
1348 .await
1349 .expect("failed to max-prune journal");
1350 let size = journal.size().await;
1351 assert_eq!(size, 10);
1352 assert_eq!(journal.test_oldest_section().await, Some(5));
1353 assert_eq!(journal.test_newest_section().await, Some(5));
1354 let bounds = journal.bounds().await;
1357 assert!(bounds.is_empty());
1358 assert_eq!(bounds.start, size);
1360
1361 {
1363 let reader = journal.reader().await;
1364 let result = reader.replay(NZUsize!(1024), 0).await;
1365 assert!(matches!(result, Err(Error::ItemPruned(0))));
1366 }
1367
1368 {
1370 let reader = journal.reader().await;
1371 let res = reader.replay(NZUsize!(1024), 0).await;
1372 assert!(matches!(res, Err(Error::ItemPruned(_))));
1373
1374 let reader = journal.reader().await;
1375 let stream = reader
1376 .replay(NZUsize!(1024), journal.bounds().await.start)
1377 .await
1378 .expect("failed to replay journal from pruning boundary");
1379 pin_mut!(stream);
1380 let mut items = Vec::new();
1381 while let Some(result) = stream.next().await {
1382 match result {
1383 Ok((pos, item)) => {
1384 assert_eq!(test_digest(pos), item);
1385 items.push(pos);
1386 }
1387 Err(err) => panic!("Failed to read item: {err}"),
1388 }
1389 }
1390 assert_eq!(items, Vec::<u64>::new());
1391 }
1392
1393 journal.destroy().await.unwrap();
1394 });
1395 }
1396
1397 #[test_traced]
1399 fn test_fixed_journal_append_a_lot_of_data() {
1400 let executor = deterministic::Runner::default();
1402 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10000);
1403 executor.start(|context| async move {
1404 let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1405 let journal = Journal::init(context.child("first"), cfg.clone())
1406 .await
1407 .expect("failed to initialize journal");
1408 for i in 0u64..ITEMS_PER_BLOB.get() * 2 - 1 {
1410 journal
1411 .append(&test_digest(i))
1412 .await
1413 .expect("failed to append data");
1414 }
1415 journal.sync().await.expect("failed to sync journal");
1417 drop(journal);
1418 let journal = Journal::init(context.child("second"), cfg.clone())
1419 .await
1420 .expect("failed to re-initialize journal");
1421 for i in 0u64..10000 {
1422 let item: Digest = journal.read(i).await.expect("failed to read data");
1423 assert_eq!(item, test_digest(i));
1424 }
1425 journal.destroy().await.expect("failed to destroy journal");
1426 });
1427 }
1428
1429 #[test_traced]
1430 fn test_fixed_journal_replay() {
1431 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1432 let executor = deterministic::Runner::default();
1434
1435 executor.start(|context| async move {
1437 let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1439 let journal = Journal::init(context.child("first"), cfg.clone())
1440 .await
1441 .expect("failed to initialize journal");
1442
1443 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1445 let pos = journal
1446 .append(&test_digest(i))
1447 .await
1448 .expect("failed to append data");
1449 assert_eq!(pos, i);
1450 }
1451
1452 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1454 let item: Digest = journal.read(i).await.expect("failed to read data");
1455 assert_eq!(item, test_digest(i), "i={i}");
1456 }
1457
1458 {
1460 let reader = journal.reader().await;
1461 let stream = reader
1462 .replay(NZUsize!(1024), 0)
1463 .await
1464 .expect("failed to replay journal");
1465 let mut items = Vec::new();
1466 pin_mut!(stream);
1467 while let Some(result) = stream.next().await {
1468 match result {
1469 Ok((pos, item)) => {
1470 assert_eq!(test_digest(pos), item, "pos={pos}, item={item:?}");
1471 items.push(pos);
1472 }
1473 Err(err) => panic!("Failed to read item: {err}"),
1474 }
1475 }
1476
1477 assert_eq!(
1479 items.len(),
1480 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1481 );
1482 items.sort();
1483 for (i, pos) in items.iter().enumerate() {
1484 assert_eq!(i as u64, *pos);
1485 }
1486 }
1487
1488 journal.sync().await.expect("Failed to sync journal");
1489 drop(journal);
1490
1491 let (blob, _) = context
1493 .open(&blob_partition(&cfg), &40u64.to_be_bytes())
1494 .await
1495 .expect("Failed to open blob");
1496 let bad_bytes = 123456789u32;
1498 blob.write_at_sync(1, bad_bytes.to_be_bytes().to_vec())
1499 .await
1500 .expect("Failed to write bad bytes");
1501
1502 let journal = Journal::init(context.child("second"), cfg.clone())
1504 .await
1505 .expect("Failed to re-initialize journal");
1506
1507 let err = journal
1509 .read(40 * ITEMS_PER_BLOB.get() + 1)
1510 .await
1511 .unwrap_err();
1512 assert!(matches!(err, Error::Runtime(_)));
1513
1514 {
1516 let mut error_found = false;
1517 let reader = journal.reader().await;
1518 let stream = reader
1519 .replay(NZUsize!(1024), 0)
1520 .await
1521 .expect("failed to replay journal");
1522 let mut items = Vec::new();
1523 pin_mut!(stream);
1524 while let Some(result) = stream.next().await {
1525 match result {
1526 Ok((pos, item)) => {
1527 assert_eq!(test_digest(pos), item);
1528 items.push(pos);
1529 }
1530 Err(err) => {
1531 error_found = true;
1532 assert!(matches!(err, Error::Runtime(_)));
1533 break;
1534 }
1535 }
1536 }
1537 assert!(error_found); }
1539 });
1540 }
1541
1542 #[test_traced]
1543 fn test_fixed_journal_init_with_corrupted_historical_blobs() {
1544 let executor = deterministic::Runner::default();
1546 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1548 executor.start(|context| async move {
1549 let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1551 let journal = Journal::init(context.child("first"), cfg.clone())
1552 .await
1553 .expect("failed to initialize journal");
1554
1555 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1557 let pos = journal
1558 .append(&test_digest(i))
1559 .await
1560 .expect("failed to append data");
1561 assert_eq!(pos, i);
1562 }
1563 journal.sync().await.expect("Failed to sync journal");
1564 drop(journal);
1565
1566 let (blob, size) = context
1571 .open(&blob_partition(&cfg), &40u64.to_be_bytes())
1572 .await
1573 .expect("Failed to open blob");
1574 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1575 blob.sync().await.expect("Failed to sync blob");
1576
1577 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1580 .await
1581 .expect("failed to initialize journal");
1582
1583 let expected_size = ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2;
1586 assert_eq!(journal.size().await, expected_size);
1587
1588 let reader = journal.reader().await;
1590 match reader.replay(NZUsize!(1024), 0).await {
1591 Err(Error::Corruption(msg)) => {
1592 assert!(
1593 msg.contains("section 40"),
1594 "Error should mention section 40, got: {msg}"
1595 );
1596 }
1597 Err(e) => panic!("Expected Corruption error for section 40, got: {:?}", e),
1598 Ok(_) => panic!("Expected replay to fail with corruption"),
1599 };
1600 });
1601 }
1602
1603 #[test_traced]
1604 fn test_fixed_journal_replay_with_missing_historical_blob() {
1605 let executor = deterministic::Runner::default();
1606 executor.start(|context| async move {
1607 let cfg = test_cfg(&context, NZU64!(2));
1608 let journal = Journal::init(context.child("first"), cfg.clone())
1609 .await
1610 .expect("failed to initialize journal");
1611 for i in 0u64..5 {
1612 journal
1613 .append(&test_digest(i))
1614 .await
1615 .expect("failed to append data");
1616 }
1617 journal.sync().await.expect("failed to sync journal");
1618 drop(journal);
1619
1620 context
1621 .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes()))
1622 .await
1623 .expect("failed to remove blob");
1624
1625 let result = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1627 .await
1628 .expect("init shouldn't fail");
1629
1630 let reader = result.reader().await;
1632 match reader.replay(NZUsize!(1024), 0).await {
1633 Err(Error::Corruption(_)) => {}
1634 Err(err) => panic!("expected Corruption, got: {err}"),
1635 Ok(_) => panic!("expected Corruption, got ok"),
1636 };
1637
1638 match result.read(2).await {
1640 Err(Error::Corruption(_)) => {}
1641 Err(err) => panic!("expected Corruption, got: {err}"),
1642 Ok(_) => panic!("expected Corruption, got ok"),
1643 };
1644 });
1645 }
1646
1647 #[test_traced]
1648 fn test_fixed_journal_test_trim_blob() {
1649 let executor = deterministic::Runner::default();
1651 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1653 executor.start(|context| async move {
1654 let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1656 let journal = Journal::init(context.child("first"), cfg.clone())
1657 .await
1658 .expect("failed to initialize journal");
1659
1660 let item_count = ITEMS_PER_BLOB.get() + 3;
1662 for i in 0u64..item_count {
1663 journal
1664 .append(&test_digest(i))
1665 .await
1666 .expect("failed to append data");
1667 }
1668 assert_eq!(journal.size().await, item_count);
1669 journal.sync().await.expect("Failed to sync journal");
1670 drop(journal);
1671
1672 let (blob, size) = context
1675 .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1676 .await
1677 .expect("Failed to open blob");
1678 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1679 blob.sync().await.expect("Failed to sync blob");
1680
1681 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1682 .await
1683 .unwrap();
1684
1685 assert_eq!(journal.size().await, item_count - 1);
1688
1689 journal.destroy().await.expect("Failed to destroy journal");
1691 });
1692 }
1693
1694 #[test_traced]
1695 fn test_fixed_journal_partial_replay() {
1696 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1697 const START_POS: u64 = 53;
1700
1701 let executor = deterministic::Runner::default();
1703 executor.start(|context| async move {
1705 let cfg = test_cfg(&context, ITEMS_PER_BLOB);
1707 let journal = Journal::init(context.child("storage"), cfg.clone())
1708 .await
1709 .expect("failed to initialize journal");
1710
1711 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1713 let pos = journal
1714 .append(&test_digest(i))
1715 .await
1716 .expect("failed to append data");
1717 assert_eq!(pos, i);
1718 }
1719
1720 {
1722 let reader = journal.reader().await;
1723 let stream = reader
1724 .replay(NZUsize!(1024), START_POS)
1725 .await
1726 .expect("failed to replay journal");
1727 let mut items = Vec::new();
1728 pin_mut!(stream);
1729 while let Some(result) = stream.next().await {
1730 match result {
1731 Ok((pos, item)) => {
1732 assert!(pos >= START_POS, "pos={pos}, expected >= {START_POS}");
1733 assert_eq!(
1734 test_digest(pos),
1735 item,
1736 "Item at position {pos} did not match expected digest"
1737 );
1738 items.push(pos);
1739 }
1740 Err(err) => panic!("Failed to read item: {err}"),
1741 }
1742 }
1743
1744 assert_eq!(
1746 items.len(),
1747 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1748 - START_POS as usize
1749 );
1750 items.sort();
1751 for (i, pos) in items.iter().enumerate() {
1752 assert_eq!(i as u64, *pos - START_POS);
1753 }
1754 }
1755
1756 journal.destroy().await.unwrap();
1757 });
1758 }
1759
1760 #[test_traced]
1761 fn test_fixed_journal_recover_from_partial_write() {
1762 let executor = deterministic::Runner::default();
1764
1765 executor.start(|context| async move {
1767 let cfg = test_cfg(&context, NZU64!(3));
1769 let journal = Journal::init(context.child("first"), cfg.clone())
1770 .await
1771 .expect("failed to initialize journal");
1772 for i in 0..5 {
1773 journal
1774 .append(&test_digest(i))
1775 .await
1776 .expect("failed to append data");
1777 }
1778 assert_eq!(journal.size().await, 5);
1779 journal.sync().await.expect("Failed to sync journal");
1780 drop(journal);
1781
1782 let (blob, size) = context
1784 .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1785 .await
1786 .expect("Failed to open blob");
1787 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1789 blob.sync().await.expect("Failed to sync blob");
1790
1791 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1793 .await
1794 .expect("Failed to re-initialize journal");
1795 assert_eq!(journal.pruning_boundary().await, 0);
1797 assert_eq!(journal.size().await, 4);
1798 drop(journal);
1799
1800 context
1802 .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes()))
1803 .await
1804 .expect("Failed to remove blob");
1805
1806 let journal = Journal::<_, Digest>::init(context.child("third"), cfg.clone())
1807 .await
1808 .expect("Failed to re-initialize journal");
1809 assert_eq!(journal.size().await, 3);
1811
1812 journal.destroy().await.unwrap();
1813 });
1814 }
1815
1816 #[test_traced]
1817 fn test_fixed_journal_recover_detects_oldest_section_too_short() {
1818 let executor = deterministic::Runner::default();
1819 executor.start(|context| async move {
1820 let cfg = test_cfg(&context, NZU64!(5));
1821 let journal =
1822 Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7)
1823 .await
1824 .expect("failed to initialize journal at size");
1825
1826 for i in 0..8u64 {
1828 journal
1829 .append(&test_digest(100 + i))
1830 .await
1831 .expect("failed to append data");
1832 }
1833 journal.sync().await.expect("failed to sync journal");
1834 assert_eq!(journal.pruning_boundary().await, 7);
1835 assert_eq!(journal.size().await, 15);
1836 drop(journal);
1837
1838 let (blob, size) = context
1840 .open(&blob_partition(&cfg), &1u64.to_be_bytes())
1841 .await
1842 .expect("failed to open oldest blob");
1843 blob.resize(size - 1).await.expect("failed to corrupt blob");
1844 blob.sync().await.expect("failed to sync blob");
1845
1846 let result = Journal::<_, Digest>::init(context.child("second"), cfg.clone()).await;
1847 assert!(matches!(result, Err(Error::Corruption(_))));
1848 });
1849 }
1850
1851 #[test_traced]
1852 fn test_fixed_journal_recover_to_empty_from_partial_write() {
1853 let executor = deterministic::Runner::default();
1854 executor.start(|context| async move {
1855 let cfg = test_cfg(&context, NZU64!(10));
1857 let journal = Journal::init(context.child("first"), cfg.clone())
1858 .await
1859 .expect("failed to initialize journal");
1860 journal
1862 .append(&test_digest(0))
1863 .await
1864 .expect("failed to append data");
1865 assert_eq!(journal.size().await, 1);
1866 journal.sync().await.expect("Failed to sync journal");
1867 drop(journal);
1868
1869 let (blob, size) = context
1871 .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1872 .await
1873 .expect("Failed to open blob");
1874 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1876 blob.sync().await.expect("Failed to sync blob");
1877
1878 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1880 .await
1881 .expect("Failed to re-initialize journal");
1882
1883 let bounds = journal.bounds().await;
1886 assert_eq!(bounds.end, 0);
1887 assert!(bounds.is_empty());
1888 journal
1890 .append(&test_digest(0))
1891 .await
1892 .expect("failed to append data");
1893 assert_eq!(journal.size().await, 1);
1894
1895 journal.destroy().await.unwrap();
1896 });
1897 }
1898
1899 #[test_traced("DEBUG")]
1900 fn test_fixed_journal_recover_from_unwritten_data() {
1901 let executor = deterministic::Runner::default();
1902 executor.start(|context| async move {
1903 let cfg = test_cfg(&context, NZU64!(10));
1905 let journal = Journal::init(context.child("first"), cfg.clone())
1906 .await
1907 .expect("failed to initialize journal");
1908
1909 journal
1911 .append(&test_digest(0))
1912 .await
1913 .expect("failed to append data");
1914 assert_eq!(journal.size().await, 1);
1915 journal.sync().await.expect("Failed to sync journal");
1916 drop(journal);
1917
1918 let (blob, size) = context
1921 .open(&blob_partition(&cfg), &0u64.to_be_bytes())
1922 .await
1923 .expect("Failed to open blob");
1924 blob.write_at_sync(size, vec![0u8; PAGE_SIZE.get() as usize * 3])
1925 .await
1926 .expect("Failed to extend blob");
1927
1928 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1930 .await
1931 .expect("Failed to re-initialize journal");
1932
1933 assert_eq!(journal.size().await, 1);
1936
1937 journal
1939 .append(&test_digest(1))
1940 .await
1941 .expect("failed to append data");
1942
1943 journal.destroy().await.unwrap();
1944 });
1945 }
1946
1947 #[test_traced]
1948 fn test_fixed_journal_rewinding() {
1949 let executor = deterministic::Runner::default();
1950 executor.start(|context| async move {
1951 let cfg = test_cfg(&context, NZU64!(2));
1953 let journal = Journal::init(context.child("first"), cfg.clone())
1954 .await
1955 .expect("failed to initialize journal");
1956 assert!(matches!(journal.rewind(0).await, Ok(())));
1957 assert!(matches!(
1958 journal.rewind(1).await,
1959 Err(Error::InvalidRewind(1))
1960 ));
1961
1962 journal
1964 .append(&test_digest(0))
1965 .await
1966 .expect("failed to append data 0");
1967 assert_eq!(journal.size().await, 1);
1968 assert!(matches!(journal.rewind(1).await, Ok(()))); assert!(matches!(journal.rewind(0).await, Ok(())));
1970 assert_eq!(journal.size().await, 0);
1971
1972 for i in 0..7 {
1974 let pos = journal
1975 .append(&test_digest(i))
1976 .await
1977 .expect("failed to append data");
1978 assert_eq!(pos, i);
1979 }
1980 assert_eq!(journal.size().await, 7);
1981
1982 assert!(matches!(journal.rewind(4).await, Ok(())));
1984 assert_eq!(journal.size().await, 4);
1985
1986 assert!(matches!(journal.rewind(0).await, Ok(())));
1988 assert_eq!(journal.size().await, 0);
1989
1990 for _ in 0..10 {
1992 for i in 0..100 {
1993 journal
1994 .append(&test_digest(i))
1995 .await
1996 .expect("failed to append data");
1997 }
1998 journal.rewind(journal.size().await - 49).await.unwrap();
1999 }
2000 const ITEMS_REMAINING: u64 = 10 * (100 - 49);
2001 assert_eq!(journal.size().await, ITEMS_REMAINING);
2002
2003 journal.sync().await.expect("Failed to sync journal");
2004 drop(journal);
2005
2006 let mut cfg = test_cfg(&context, NZU64!(3));
2008 cfg.partition = "test-partition-2".into();
2009 let journal = Journal::init(context.child("second"), cfg.clone())
2010 .await
2011 .expect("failed to initialize journal");
2012 for _ in 0..10 {
2013 for i in 0..100 {
2014 journal
2015 .append(&test_digest(i))
2016 .await
2017 .expect("failed to append data");
2018 }
2019 journal.rewind(journal.size().await - 49).await.unwrap();
2020 }
2021 assert_eq!(journal.size().await, ITEMS_REMAINING);
2022
2023 journal.sync().await.expect("Failed to sync journal");
2024 drop(journal);
2025
2026 let journal: Journal<_, Digest> = Journal::init(context.child("third"), cfg.clone())
2028 .await
2029 .expect("failed to re-initialize journal");
2030 assert_eq!(journal.size().await, 10 * (100 - 49));
2031
2032 journal.prune(300).await.expect("pruning failed");
2034 assert_eq!(journal.size().await, ITEMS_REMAINING);
2035 assert!(matches!(
2037 journal.rewind(299).await,
2038 Err(Error::InvalidRewind(299))
2039 ));
2040 assert!(matches!(journal.rewind(300).await, Ok(())));
2043 let bounds = journal.bounds().await;
2044 assert_eq!(bounds.end, 300);
2045 assert!(bounds.is_empty());
2046
2047 journal.destroy().await.unwrap();
2048 });
2049 }
2050
2051 #[test_traced]
2059 fn test_fixed_journal_recover_from_page_boundary_truncation() {
2060 let executor = deterministic::Runner::default();
2061 executor.start(|context: Context| async move {
2062 let cfg = test_cfg(&context, NZU64!(100));
2064 let journal = Journal::init(context.child("first"), cfg.clone())
2065 .await
2066 .expect("failed to initialize journal");
2067
2068 for i in 0u64..10 {
2076 journal
2077 .append(&test_digest(i))
2078 .await
2079 .expect("failed to append data");
2080 }
2081 assert_eq!(journal.size().await, 10);
2082 journal.sync().await.expect("Failed to sync journal");
2083 drop(journal);
2084
2085 let physical_page_size = PAGE_SIZE.get() as u64 + 12;
2088 let (blob, size) = context
2089 .open(&blob_partition(&cfg), &0u64.to_be_bytes())
2090 .await
2091 .expect("Failed to open blob");
2092
2093 let full_pages = size / physical_page_size;
2095 assert!(full_pages >= 2, "need at least 2 pages for this test");
2096 let truncate_to = (full_pages - 1) * physical_page_size;
2097
2098 blob.resize(truncate_to)
2099 .await
2100 .expect("Failed to truncate blob");
2101 blob.sync().await.expect("Failed to sync blob");
2102
2103 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
2105 .await
2106 .expect("Failed to re-initialize journal after page truncation");
2107
2108 let remaining_logical_bytes = (full_pages - 1) * PAGE_SIZE.get() as u64;
2113 let expected_items = remaining_logical_bytes / 32; assert_eq!(
2115 journal.size().await,
2116 expected_items,
2117 "Journal should recover to {} items after truncation",
2118 expected_items
2119 );
2120
2121 for i in 0..expected_items {
2123 let item = journal
2124 .read(i)
2125 .await
2126 .expect("failed to read recovered item");
2127 assert_eq!(item, test_digest(i), "item {} mismatch after recovery", i);
2128 }
2129
2130 journal.destroy().await.expect("Failed to destroy journal");
2131 });
2132 }
2133
2134 #[test_traced]
2140 fn test_single_item_per_blob() {
2141 let executor = deterministic::Runner::default();
2142 executor.start(|context| async move {
2143 let cfg = Config {
2144 partition: "single-item-per-blob".into(),
2145 items_per_blob: NZU64!(1),
2146 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2147 write_buffer: NZUsize!(2048),
2148 };
2149
2150 let journal = Journal::init(context.child("first"), cfg.clone())
2152 .await
2153 .expect("failed to initialize journal");
2154
2155 let bounds = journal.bounds().await;
2157 assert_eq!(bounds.end, 0);
2158 assert!(bounds.is_empty());
2159
2160 let pos = journal
2162 .append(&test_digest(0))
2163 .await
2164 .expect("failed to append");
2165 assert_eq!(pos, 0);
2166 assert_eq!(journal.size().await, 1);
2167
2168 journal.sync().await.expect("failed to sync");
2170
2171 let value = journal
2173 .read(journal.size().await - 1)
2174 .await
2175 .expect("failed to read");
2176 assert_eq!(value, test_digest(0));
2177
2178 for i in 1..10u64 {
2180 let pos = journal
2181 .append(&test_digest(i))
2182 .await
2183 .expect("failed to append");
2184 assert_eq!(pos, i);
2185 assert_eq!(journal.size().await, i + 1);
2186
2187 let value = journal
2189 .read(journal.size().await - 1)
2190 .await
2191 .expect("failed to read");
2192 assert_eq!(value, test_digest(i));
2193 }
2194
2195 for i in 0..10u64 {
2197 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2198 }
2199
2200 journal.sync().await.expect("failed to sync");
2201
2202 journal.prune(5).await.expect("failed to prune");
2205
2206 assert_eq!(journal.size().await, 10);
2208
2209 assert_eq!(journal.bounds().await.start, 5);
2211
2212 let value = journal
2214 .read(journal.size().await - 1)
2215 .await
2216 .expect("failed to read");
2217 assert_eq!(value, test_digest(9));
2218
2219 for i in 0..5 {
2221 assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
2222 }
2223
2224 for i in 5..10u64 {
2226 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2227 }
2228
2229 for i in 10..15u64 {
2231 let pos = journal
2232 .append(&test_digest(i))
2233 .await
2234 .expect("failed to append");
2235 assert_eq!(pos, i);
2236
2237 let value = journal
2239 .read(journal.size().await - 1)
2240 .await
2241 .expect("failed to read");
2242 assert_eq!(value, test_digest(i));
2243 }
2244
2245 journal.sync().await.expect("failed to sync");
2246 drop(journal);
2247
2248 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
2250 .await
2251 .expect("failed to re-initialize journal");
2252
2253 assert_eq!(journal.size().await, 15);
2255
2256 assert_eq!(journal.bounds().await.start, 5);
2258
2259 let value = journal
2261 .read(journal.size().await - 1)
2262 .await
2263 .expect("failed to read");
2264 assert_eq!(value, test_digest(14));
2265
2266 for i in 5..15u64 {
2268 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2269 }
2270
2271 journal.destroy().await.expect("failed to destroy journal");
2272
2273 let journal = Journal::init(context.child("third"), cfg.clone())
2276 .await
2277 .expect("failed to initialize journal");
2278
2279 for i in 0..10u64 {
2281 journal.append(&test_digest(i + 100)).await.unwrap();
2282 }
2283
2284 journal.prune(5).await.unwrap();
2286 let bounds = journal.bounds().await;
2287 assert_eq!(bounds.end, 10);
2288 assert_eq!(bounds.start, 5);
2289
2290 journal.sync().await.unwrap();
2292 drop(journal);
2293
2294 let journal = Journal::<_, Digest>::init(context.child("fourth"), cfg.clone())
2296 .await
2297 .expect("failed to re-initialize journal");
2298
2299 let bounds = journal.bounds().await;
2301 assert_eq!(bounds.end, 10);
2302 assert_eq!(bounds.start, 5);
2303
2304 let value = journal.read(journal.size().await - 1).await.unwrap();
2306 assert_eq!(value, test_digest(109));
2307
2308 for i in 5..10u64 {
2310 assert_eq!(journal.read(i).await.unwrap(), test_digest(i + 100));
2311 }
2312
2313 journal.destroy().await.expect("failed to destroy journal");
2314
2315 let journal = Journal::init(context.child("storage"), cfg.clone())
2317 .await
2318 .expect("failed to initialize journal");
2319
2320 for i in 0..5u64 {
2321 journal.append(&test_digest(i + 200)).await.unwrap();
2322 }
2323 journal.sync().await.unwrap();
2324
2325 journal.prune(5).await.unwrap();
2327 let bounds = journal.bounds().await;
2328 assert_eq!(bounds.end, 5); assert!(bounds.is_empty()); let result = journal.read(journal.size().await - 1).await;
2333 assert!(matches!(result, Err(Error::ItemPruned(4))));
2334
2335 journal.append(&test_digest(205)).await.unwrap();
2337 assert_eq!(journal.bounds().await.start, 5);
2338 assert_eq!(
2339 journal.read(journal.size().await - 1).await.unwrap(),
2340 test_digest(205)
2341 );
2342
2343 journal.destroy().await.expect("failed to destroy journal");
2344 });
2345 }
2346
2347 #[test_traced]
2348 fn test_fixed_journal_init_at_size_zero() {
2349 let executor = deterministic::Runner::default();
2350 executor.start(|context| async move {
2351 let cfg = test_cfg(&context, NZU64!(5));
2352 let journal =
2353 Journal::<_, Digest>::init_at_size(context.child("storage"), cfg.clone(), 0)
2354 .await
2355 .unwrap();
2356
2357 let bounds = journal.bounds().await;
2358 assert_eq!(bounds.end, 0);
2359 assert!(bounds.is_empty());
2360
2361 let pos = journal.append(&test_digest(100)).await.unwrap();
2363 assert_eq!(pos, 0);
2364 assert_eq!(journal.size().await, 1);
2365 assert_eq!(journal.read(0).await.unwrap(), test_digest(100));
2366
2367 journal.destroy().await.unwrap();
2368 });
2369 }
2370
2371 #[test_traced]
2372 fn test_fixed_journal_init_at_size_section_boundary() {
2373 let executor = deterministic::Runner::default();
2374 executor.start(|context| async move {
2375 let cfg = test_cfg(&context, NZU64!(5));
2376
2377 let journal =
2379 Journal::<_, Digest>::init_at_size(context.child("storage"), cfg.clone(), 10)
2380 .await
2381 .unwrap();
2382
2383 let bounds = journal.bounds().await;
2384 assert_eq!(bounds.end, 10);
2385 assert!(bounds.is_empty());
2386
2387 let pos = journal.append(&test_digest(1000)).await.unwrap();
2389 assert_eq!(pos, 10);
2390 assert_eq!(journal.size().await, 11);
2391 assert_eq!(journal.read(10).await.unwrap(), test_digest(1000));
2392
2393 let pos = journal.append(&test_digest(1001)).await.unwrap();
2395 assert_eq!(pos, 11);
2396 assert_eq!(journal.read(11).await.unwrap(), test_digest(1001));
2397
2398 journal.destroy().await.unwrap();
2399 });
2400 }
2401
2402 #[test_traced]
2403 fn test_fixed_journal_init_at_size_mid_section() {
2404 let executor = deterministic::Runner::default();
2405 executor.start(|context| async move {
2406 let cfg = test_cfg(&context, NZU64!(5));
2407
2408 let journal =
2410 Journal::<_, Digest>::init_at_size(context.child("storage"), cfg.clone(), 7)
2411 .await
2412 .unwrap();
2413
2414 let bounds = journal.bounds().await;
2415 assert_eq!(bounds.end, 7);
2416 assert!(bounds.is_empty());
2418
2419 assert!(matches!(journal.read(5).await, Err(Error::ItemPruned(5))));
2421 assert!(matches!(journal.read(6).await, Err(Error::ItemPruned(6))));
2422
2423 let pos = journal.append(&test_digest(700)).await.unwrap();
2425 assert_eq!(pos, 7);
2426 assert_eq!(journal.size().await, 8);
2427 assert_eq!(journal.read(7).await.unwrap(), test_digest(700));
2428 assert_eq!(journal.bounds().await.start, 7);
2430
2431 journal.destroy().await.unwrap();
2432 });
2433 }
2434
2435 #[test_traced]
2436 fn test_fixed_journal_init_at_size_persistence() {
2437 let executor = deterministic::Runner::default();
2438 executor.start(|context| async move {
2439 let cfg = test_cfg(&context, NZU64!(5));
2440
2441 let journal =
2443 Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 15)
2444 .await
2445 .unwrap();
2446
2447 for i in 0..5u64 {
2449 let pos = journal.append(&test_digest(1500 + i)).await.unwrap();
2450 assert_eq!(pos, 15 + i);
2451 }
2452
2453 assert_eq!(journal.size().await, 20);
2454
2455 journal.sync().await.unwrap();
2457 drop(journal);
2458
2459 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
2460 .await
2461 .unwrap();
2462
2463 let bounds = journal.bounds().await;
2465 assert_eq!(bounds.end, 20);
2466 assert_eq!(bounds.start, 15);
2467
2468 for i in 0..5u64 {
2470 assert_eq!(journal.read(15 + i).await.unwrap(), test_digest(1500 + i));
2471 }
2472
2473 let pos = journal.append(&test_digest(9999)).await.unwrap();
2475 assert_eq!(pos, 20);
2476 assert_eq!(journal.read(20).await.unwrap(), test_digest(9999));
2477
2478 journal.destroy().await.unwrap();
2479 });
2480 }
2481
2482 #[test_traced]
2483 fn test_fixed_journal_init_at_size_persistence_without_data() {
2484 let executor = deterministic::Runner::default();
2485 executor.start(|context| async move {
2486 let cfg = test_cfg(&context, NZU64!(5));
2487
2488 let journal =
2490 Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 15)
2491 .await
2492 .unwrap();
2493
2494 let bounds = journal.bounds().await;
2495 assert_eq!(bounds.end, 15);
2496 assert!(bounds.is_empty());
2497
2498 drop(journal);
2500
2501 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
2503 .await
2504 .unwrap();
2505
2506 let bounds = journal.bounds().await;
2507 assert_eq!(bounds.end, 15);
2508 assert!(bounds.is_empty());
2509
2510 let pos = journal.append(&test_digest(1500)).await.unwrap();
2512 assert_eq!(pos, 15);
2513 assert_eq!(journal.read(15).await.unwrap(), test_digest(1500));
2514
2515 journal.destroy().await.unwrap();
2516 });
2517 }
2518
2519 #[test_traced]
2520 fn test_fixed_journal_init_at_size_large_offset() {
2521 let executor = deterministic::Runner::default();
2522 executor.start(|context| async move {
2523 let cfg = test_cfg(&context, NZU64!(5));
2524
2525 let journal =
2527 Journal::<_, Digest>::init_at_size(context.child("storage"), cfg.clone(), 1000)
2528 .await
2529 .unwrap();
2530
2531 let bounds = journal.bounds().await;
2532 assert_eq!(bounds.end, 1000);
2533 assert!(bounds.is_empty());
2534
2535 let pos = journal.append(&test_digest(100000)).await.unwrap();
2537 assert_eq!(pos, 1000);
2538 assert_eq!(journal.read(1000).await.unwrap(), test_digest(100000));
2539
2540 journal.destroy().await.unwrap();
2541 });
2542 }
2543
2544 #[test_traced]
2545 fn test_fixed_journal_init_at_size_prune_and_append() {
2546 let executor = deterministic::Runner::default();
2547 executor.start(|context| async move {
2548 let cfg = test_cfg(&context, NZU64!(5));
2549
2550 let journal =
2552 Journal::<_, Digest>::init_at_size(context.child("storage"), cfg.clone(), 20)
2553 .await
2554 .unwrap();
2555
2556 for i in 0..10u64 {
2558 journal.append(&test_digest(2000 + i)).await.unwrap();
2559 }
2560
2561 assert_eq!(journal.size().await, 30);
2562
2563 journal.prune(25).await.unwrap();
2565
2566 let bounds = journal.bounds().await;
2567 assert_eq!(bounds.end, 30);
2568 assert_eq!(bounds.start, 25);
2569
2570 for i in 25..30u64 {
2572 assert_eq!(journal.read(i).await.unwrap(), test_digest(2000 + (i - 20)));
2573 }
2574
2575 let pos = journal.append(&test_digest(3000)).await.unwrap();
2577 assert_eq!(pos, 30);
2578
2579 journal.destroy().await.unwrap();
2580 });
2581 }
2582
2583 #[test_traced]
2584 fn test_fixed_journal_clear_to_size() {
2585 let executor = deterministic::Runner::default();
2586 executor.start(|context| async move {
2587 let cfg = test_cfg(&context, NZU64!(10));
2588 let journal = Journal::init(context.child("journal"), cfg.clone())
2589 .await
2590 .expect("failed to initialize journal");
2591
2592 for i in 0..25u64 {
2594 journal.append(&test_digest(i)).await.unwrap();
2595 }
2596 assert_eq!(journal.size().await, 25);
2597 journal.sync().await.unwrap();
2598
2599 journal.clear_to_size(100).await.unwrap();
2601 assert_eq!(journal.size().await, 100);
2602
2603 for i in 0..25 {
2605 assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
2606 }
2607
2608 drop(journal);
2610 let journal =
2611 Journal::<_, Digest>::init(context.child("journal_after_clear"), cfg.clone())
2612 .await
2613 .expect("failed to re-initialize journal after clear");
2614 assert_eq!(journal.size().await, 100);
2615
2616 for i in 100..105u64 {
2618 let pos = journal.append(&test_digest(i)).await.unwrap();
2619 assert_eq!(pos, i);
2620 }
2621 assert_eq!(journal.size().await, 105);
2622
2623 for i in 100..105u64 {
2625 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2626 }
2627
2628 journal.sync().await.unwrap();
2630 drop(journal);
2631
2632 let journal = Journal::<_, Digest>::init(context.child("journal_reopened"), cfg)
2633 .await
2634 .expect("failed to re-initialize journal");
2635
2636 assert_eq!(journal.size().await, 105);
2637 for i in 100..105u64 {
2638 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
2639 }
2640
2641 journal.destroy().await.unwrap();
2642 });
2643 }
2644
2645 #[test_traced]
2646 fn test_fixed_journal_sync_crash_meta_none_boundary_aligned() {
2647 let executor = deterministic::Runner::default();
2649 executor.start(|context| async move {
2650 let cfg = test_cfg(&context, NZU64!(5));
2651 let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone())
2652 .await
2653 .unwrap();
2654
2655 for i in 0..5u64 {
2656 journal.append(&test_digest(i)).await.unwrap();
2657 }
2658 let inner = journal.inner.read().await;
2659 let tail_section = inner.size / journal.items_per_blob;
2660 inner.journal.sync(tail_section).await.unwrap();
2661 drop(inner);
2662 drop(journal);
2663
2664 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
2665 .await
2666 .unwrap();
2667 let bounds = journal.bounds().await;
2668 assert_eq!(bounds.start, 0);
2669 assert_eq!(bounds.end, 5);
2670 journal.destroy().await.unwrap();
2671 });
2672 }
2673
2674 #[test_traced]
2675 fn test_fixed_journal_oldest_section_invalid_len() {
2676 let executor = deterministic::Runner::default();
2678 executor.start(|context| async move {
2679 let cfg = test_cfg(&context, NZU64!(5));
2680 let journal =
2681 Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7)
2682 .await
2683 .unwrap();
2684 for i in 0..3u64 {
2685 journal.append(&test_digest(i)).await.unwrap();
2686 }
2687 assert_eq!(journal.inner.read().await.journal.newest_section(), Some(2));
2688 journal.sync().await.unwrap();
2689
2690 let mut inner = journal.inner.write().await;
2692 inner.metadata.clear();
2693 inner.metadata.sync().await.unwrap();
2694 drop(inner);
2695 drop(journal);
2696
2697 let result = Journal::<_, Digest>::init(context.child("second"), cfg.clone()).await;
2701 assert!(matches!(result, Err(Error::Corruption(_))));
2702 context.remove(&blob_partition(&cfg), None).await.unwrap();
2703 context
2704 .remove(&format!("{}-metadata", cfg.partition), None)
2705 .await
2706 .unwrap();
2707 });
2708 }
2709
2710 #[test_traced]
2711 fn test_fixed_journal_sync_crash_meta_mid_boundary_unchanged() {
2712 let executor = deterministic::Runner::default();
2714 executor.start(|context| async move {
2715 let cfg = test_cfg(&context, NZU64!(5));
2716 let journal =
2717 Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7)
2718 .await
2719 .unwrap();
2720 for i in 0..3u64 {
2721 journal.append(&test_digest(i)).await.unwrap();
2722 }
2723 let inner = journal.inner.read().await;
2724 let tail_section = inner.size / journal.items_per_blob;
2725 inner.journal.sync(tail_section).await.unwrap();
2726 drop(inner);
2727 drop(journal);
2728
2729 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
2730 .await
2731 .unwrap();
2732 let bounds = journal.bounds().await;
2733 assert_eq!(bounds.start, 7);
2734 assert_eq!(bounds.end, 10);
2735 journal.destroy().await.unwrap();
2736 });
2737 }
2738 #[test_traced]
2739 fn test_fixed_journal_sync_crash_meta_mid_to_aligned_becomes_stale() {
2740 let executor = deterministic::Runner::default();
2742 executor.start(|context| async move {
2743 let cfg = test_cfg(&context, NZU64!(5));
2744 let journal =
2745 Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7)
2746 .await
2747 .unwrap();
2748 for i in 0..10u64 {
2749 journal.append(&test_digest(i)).await.unwrap();
2750 }
2751 assert_eq!(journal.size().await, 17);
2752 journal.prune(10).await.unwrap();
2753
2754 let inner = journal.inner.read().await;
2755 let tail_section = inner.size / journal.items_per_blob;
2756 inner.journal.sync(tail_section).await.unwrap();
2757 drop(inner);
2758 drop(journal);
2759
2760 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
2761 .await
2762 .unwrap();
2763 let bounds = journal.bounds().await;
2764 assert_eq!(bounds.start, 10);
2765 assert_eq!(bounds.end, 17);
2766 journal.destroy().await.unwrap();
2767 });
2768 }
2769
2770 #[test_traced]
2771 fn test_fixed_journal_prune_does_not_move_boundary_backwards() {
2772 let executor = deterministic::Runner::default();
2775 executor.start(|context| async move {
2776 let cfg = test_cfg(&context, NZU64!(5));
2777 let journal =
2779 Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7)
2780 .await
2781 .unwrap();
2782 for i in 0..5u64 {
2784 journal.append(&test_digest(i)).await.unwrap();
2785 }
2786 journal.prune(5).await.unwrap();
2788 assert_eq!(journal.bounds().await.start, 7);
2789 journal.destroy().await.unwrap();
2790 });
2791 }
2792
2793 #[test_traced]
2794 fn test_fixed_journal_replay_after_init_at_size_spanning_sections() {
2795 let executor = deterministic::Runner::default();
2798 executor.start(|context| async move {
2799 let cfg = test_cfg(&context, NZU64!(5));
2800
2801 let journal =
2804 Journal::<_, Digest>::init_at_size(context.child("storage"), cfg.clone(), 7)
2805 .await
2806 .unwrap();
2807
2808 for i in 0..13u64 {
2810 let pos = journal.append(&test_digest(100 + i)).await.unwrap();
2811 assert_eq!(pos, 7 + i);
2812 }
2813 assert_eq!(journal.size().await, 20);
2814 journal.sync().await.unwrap();
2815
2816 {
2818 let reader = journal.reader().await;
2819 let stream = reader
2820 .replay(NZUsize!(1024), 7)
2821 .await
2822 .expect("failed to replay");
2823 pin_mut!(stream);
2824 let mut items: Vec<(u64, Digest)> = Vec::new();
2825 while let Some(result) = stream.next().await {
2826 items.push(result.expect("replay item failed"));
2827 }
2828
2829 assert_eq!(items.len(), 13);
2831 for (i, (pos, item)) in items.iter().enumerate() {
2832 assert_eq!(*pos, 7 + i as u64);
2833 assert_eq!(*item, test_digest(100 + i as u64));
2834 }
2835 }
2836
2837 {
2839 let reader = journal.reader().await;
2840 let stream = reader
2841 .replay(NZUsize!(1024), 12)
2842 .await
2843 .expect("failed to replay from mid-stream");
2844 pin_mut!(stream);
2845 let mut items: Vec<(u64, Digest)> = Vec::new();
2846 while let Some(result) = stream.next().await {
2847 items.push(result.expect("replay item failed"));
2848 }
2849
2850 assert_eq!(items.len(), 8);
2852 for (i, (pos, item)) in items.iter().enumerate() {
2853 assert_eq!(*pos, 12 + i as u64);
2854 assert_eq!(*item, test_digest(100 + 5 + i as u64));
2855 }
2856 }
2857
2858 journal.destroy().await.unwrap();
2859 });
2860 }
2861
2862 #[test_traced]
2863 fn test_fixed_journal_rewind_error_before_bounds_start() {
2864 let executor = deterministic::Runner::default();
2866 executor.start(|context| async move {
2867 let cfg = test_cfg(&context, NZU64!(5));
2868
2869 let journal =
2870 Journal::<_, Digest>::init_at_size(context.child("storage"), cfg.clone(), 10)
2871 .await
2872 .unwrap();
2873
2874 for i in 0..3u64 {
2876 journal.append(&test_digest(i)).await.unwrap();
2877 }
2878 assert_eq!(journal.size().await, 13);
2879
2880 journal.rewind(11).await.unwrap();
2882 assert_eq!(journal.size().await, 11);
2883
2884 journal.rewind(10).await.unwrap();
2886 assert_eq!(journal.size().await, 10);
2887
2888 let result = journal.rewind(9).await;
2890 assert!(matches!(result, Err(Error::InvalidRewind(9))));
2891
2892 journal.destroy().await.unwrap();
2893 });
2894 }
2895
2896 #[test_traced]
2897 fn test_fixed_journal_init_at_size_crash_scenarios() {
2898 let executor = deterministic::Runner::default();
2899 executor.start(|context| async move {
2900 let cfg = test_cfg(&context, NZU64!(5));
2901
2902 let journal =
2904 Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7)
2905 .await
2906 .unwrap();
2907 for i in 0..5u64 {
2908 journal.append(&test_digest(i)).await.unwrap();
2909 }
2910 journal.sync().await.unwrap();
2911 drop(journal);
2912
2913 let blob_part = blob_partition(&cfg);
2916 context.remove(&blob_part, None).await.unwrap();
2917
2918 let journal = Journal::<_, Digest>::init(
2920 context.child("crash").with_attribute("index", 1),
2921 cfg.clone(),
2922 )
2923 .await
2924 .expect("init failed after clear crash");
2925 let bounds = journal.bounds().await;
2926 assert_eq!(bounds.end, 0);
2927 assert_eq!(bounds.start, 0);
2928 drop(journal);
2929
2930 let meta_cfg = MetadataConfig {
2932 partition: format!("{}-metadata", cfg.partition),
2933 codec_config: ((0..).into(), ()),
2934 };
2935 let mut metadata =
2936 Metadata::<_, u64, Vec<u8>>::init(context.child("restore_meta"), meta_cfg.clone())
2937 .await
2938 .unwrap();
2939 metadata
2940 .put_sync(PRUNING_BOUNDARY_KEY, 7u64.to_be_bytes().to_vec())
2941 .await
2942 .unwrap();
2943
2944 let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap();
2957 blob.sync().await.unwrap(); let journal = Journal::<_, Digest>::init(
2961 context.child("crash").with_attribute("index", 2),
2962 cfg.clone(),
2963 )
2964 .await
2965 .expect("init failed after create crash");
2966
2967 let bounds = journal.bounds().await;
2969 assert_eq!(bounds.start, 0);
2970 assert_eq!(bounds.end, 0);
2972 journal.destroy().await.unwrap();
2973 });
2974 }
2975
2976 #[test_traced]
2977 fn test_fixed_journal_clear_to_size_crash_scenarios() {
2978 let executor = deterministic::Runner::default();
2979 executor.start(|context| async move {
2980 let cfg = test_cfg(&context, NZU64!(5));
2981
2982 let journal =
2985 Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 12)
2986 .await
2987 .unwrap();
2988 journal.sync().await.unwrap();
2989 drop(journal);
2990
2991 let blob_part = blob_partition(&cfg);
2996 context.remove(&blob_part, None).await.unwrap();
2997
2998 let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap();
3000 blob.sync().await.unwrap();
3001
3002 let journal = Journal::<_, Digest>::init(context.child("crash_clear"), cfg.clone())
3007 .await
3008 .expect("init failed after clear_to_size crash");
3009
3010 let bounds = journal.bounds().await;
3012 assert_eq!(bounds.start, 0);
3013 assert_eq!(bounds.end, 0);
3014 journal.destroy().await.unwrap();
3015 });
3016 }
3017
3018 #[test_traced]
3019 fn test_read_many_empty() {
3020 let executor = deterministic::Runner::default();
3021 executor.start(|context| async move {
3022 let cfg = test_cfg(&context, NZU64!(10));
3023 let journal = Journal::<_, Digest>::init(context.child("j"), cfg)
3024 .await
3025 .unwrap();
3026
3027 let items = journal.reader().await.read_many(&[]).await.unwrap();
3028 assert!(items.is_empty());
3029
3030 journal.destroy().await.unwrap();
3031 });
3032 }
3033
3034 #[test_traced]
3035 fn test_read_many_single_blob() {
3036 let executor = deterministic::Runner::default();
3038 executor.start(|context| async move {
3039 let cfg = test_cfg(&context, NZU64!(10));
3040 let journal = Journal::init(context.child("j"), cfg).await.unwrap();
3041
3042 for i in 0..5u64 {
3043 journal.append(&test_digest(i)).await.unwrap();
3044 }
3045 assert_eq!(journal.size().await, 5);
3046
3047 let items = journal.reader().await.read_many(&[0, 2, 4]).await.unwrap();
3048 assert_eq!(items, vec![test_digest(0), test_digest(2), test_digest(4)]);
3049
3050 journal.destroy().await.unwrap();
3051 });
3052 }
3053
3054 #[test_traced]
3055 fn test_read_many_across_blobs() {
3056 let executor = deterministic::Runner::default();
3058 executor.start(|context| async move {
3059 let cfg = test_cfg(&context, NZU64!(3));
3060 let journal = Journal::init(context.child("j"), cfg).await.unwrap();
3061
3062 for i in 0..9u64 {
3063 journal.append(&test_digest(i)).await.unwrap();
3064 }
3065 assert_eq!(journal.size().await, 9);
3066 let items = journal.reader().await.read_many(&[1, 4, 7]).await.unwrap();
3069 assert_eq!(items, vec![test_digest(1), test_digest(4), test_digest(7)]);
3070
3071 journal.destroy().await.unwrap();
3072 });
3073 }
3074
3075 #[test_traced]
3076 fn test_read_many_after_prune() {
3077 let executor = deterministic::Runner::default();
3079 executor.start(|context| async move {
3080 let cfg = test_cfg(&context, NZU64!(3));
3081 let journal = Journal::init(context.child("j"), cfg).await.unwrap();
3082
3083 for i in 0..9u64 {
3084 journal.append(&test_digest(i)).await.unwrap();
3085 }
3086 assert_eq!(journal.size().await, 9);
3087 journal.sync().await.unwrap();
3088
3089 journal.prune(3).await.unwrap();
3091 assert_eq!(journal.bounds().await, 3..9);
3092
3093 let items = journal.reader().await.read_many(&[3, 5, 8]).await.unwrap();
3094 assert_eq!(items, vec![test_digest(3), test_digest(5), test_digest(8)]);
3095
3096 let err = journal.reader().await.read_many(&[1]).await.unwrap_err();
3098 assert!(matches!(err, Error::ItemPruned(1)));
3099
3100 journal.destroy().await.unwrap();
3101 });
3102 }
3103
3104 #[test_traced]
3105 fn test_read_many_out_of_range() {
3106 let executor = deterministic::Runner::default();
3107 executor.start(|context| async move {
3108 let cfg = test_cfg(&context, NZU64!(10));
3109 let journal = Journal::init(context.child("j"), cfg).await.unwrap();
3110
3111 for i in 0..3u64 {
3112 journal.append(&test_digest(i)).await.unwrap();
3113 }
3114 assert_eq!(journal.size().await, 3);
3115
3116 let err = journal.reader().await.read_many(&[0, 5]).await.unwrap_err();
3117 assert!(matches!(err, Error::ItemOutOfRange(5)));
3118
3119 journal.destroy().await.unwrap();
3120 });
3121 }
3122
3123 #[test_traced]
3124 fn test_read_many_matches_read() {
3125 let executor = deterministic::Runner::default();
3127 executor.start(|context| async move {
3128 let cfg = test_cfg(&context, NZU64!(4));
3129 let journal = Journal::init(context.child("j"), cfg).await.unwrap();
3130
3131 for i in 0..20u64 {
3132 journal.append(&test_digest(i)).await.unwrap();
3133 }
3134 assert_eq!(journal.size().await, 20);
3135 journal.sync().await.unwrap();
3136
3137 let positions: Vec<u64> = (0..20).collect();
3138 let reader = journal.reader().await;
3139 let batch = reader.read_many(&positions).await.unwrap();
3140
3141 for &pos in &positions {
3142 let single = reader.read(pos).await.unwrap();
3143 assert_eq!(batch[pos as usize], single);
3144 }
3145 drop(reader);
3146
3147 journal.destroy().await.unwrap();
3148 });
3149 }
3150
3151 #[test_traced]
3152 fn test_fixed_journal_metrics() {
3153 let executor = deterministic::Runner::default();
3154 executor.start(|context| async move {
3155 let cfg = test_cfg(&context, NZU64!(2));
3156 let journal = Journal::<_, Digest>::init(context.child("fixed_metrics"), cfg.clone())
3157 .await
3158 .unwrap();
3159
3160 let items: Vec<_> = (0..5).map(test_digest).collect();
3161 journal.append_many(Many::Flat(&items)).await.unwrap();
3162 journal.append(&test_digest(5)).await.unwrap();
3163 journal.sync().await.unwrap();
3164 journal.reader().await.read(0).await.unwrap();
3165 journal.reader().await.try_read_sync(0).unwrap();
3166 journal.reader().await.read_many(&[1, 2, 4]).await.unwrap();
3167 journal.prune(2).await.unwrap();
3168 journal.rewind(4).await.unwrap();
3169
3170 let buffer = context.encode();
3171 for expected in [
3172 "fixed_metrics_size 4",
3173 "fixed_metrics_pruning_boundary 2",
3174 "fixed_metrics_retained 2",
3175 "fixed_metrics_tail_items 2",
3176 "fixed_metrics_append_calls_total 1",
3177 "fixed_metrics_append_many_calls_total 1",
3178 "fixed_metrics_read_calls_total 1",
3179 "fixed_metrics_read_many_calls_total 1",
3180 "fixed_metrics_try_read_sync_hits_total 1",
3181 "fixed_metrics_items_read_total 5",
3182 "fixed_metrics_sync_calls_total 1",
3183 "fixed_metrics_append_duration_count 1",
3184 "fixed_metrics_append_many_duration_count 1",
3185 "fixed_metrics_read_duration_count 1",
3186 "fixed_metrics_read_many_duration_count 1",
3187 "fixed_metrics_sync_duration_count 1",
3188 "fixed_metrics_cache_hits_total",
3189 "fixed_metrics_cache_misses_total",
3190 "fixed_metrics_blobs_tracked",
3191 ] {
3192 assert!(buffer.contains(expected), "{expected}\n{buffer}");
3193 }
3194
3195 journal.destroy().await.unwrap();
3196 });
3197 }
3198}