1use crate::{
58 journal::{
59 contiguous::MutableContiguous,
60 segmented::fixed::{Config as SegmentedConfig, Journal as SegmentedJournal},
61 Error,
62 },
63 mmr::Location,
64 Persistable,
65};
66use commonware_codec::CodecFixedShared;
67use commonware_runtime::{buffer::PoolRef, Metrics, Storage};
68use futures::{stream::Stream, StreamExt};
69use std::{
70 num::{NonZeroU64, NonZeroUsize},
71 ops::Range,
72};
73use tracing::debug;
74
75#[derive(Clone)]
77pub struct Config {
78 pub partition: String,
80
81 pub items_per_blob: NonZeroU64,
86
87 pub buffer_pool: PoolRef,
89
90 pub write_buffer: NonZeroUsize,
92}
93
94pub struct Journal<E: Storage + Metrics, A: CodecFixedShared> {
109 inner: SegmentedJournal<E, A>,
110
111 items_per_blob: u64,
113
114 size: u64,
116}
117
118impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
119 pub const CHUNK_SIZE: usize = SegmentedJournal::<E, A>::CHUNK_SIZE;
121
122 pub const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
124
125 pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
130 let items_per_blob = cfg.items_per_blob.get();
131
132 let segmented_cfg = SegmentedConfig {
133 partition: cfg.partition,
134 buffer_pool: cfg.buffer_pool,
135 write_buffer: cfg.write_buffer,
136 };
137
138 let mut inner = SegmentedJournal::init(context, segmented_cfg).await?;
139
140 let oldest_section = inner.oldest_section();
142 let newest_section = inner.newest_section();
143
144 let size = match (oldest_section, newest_section) {
145 (Some(_), Some(newest)) => {
146 let tail_len = inner.section_len(newest).await?;
148 newest * items_per_blob + tail_len
149 }
150 _ => 0,
151 };
152
153 let tail_section = size / items_per_blob;
157 inner.ensure_section_exists(tail_section).await?;
158
159 Ok(Self {
160 inner,
161 items_per_blob,
162 size,
163 })
164 }
165
166 pub async fn init_at_size(context: E, cfg: Config, size: u64) -> Result<Self, Error> {
186 let items_per_blob = cfg.items_per_blob.get();
187
188 let tail_section = size / items_per_blob;
190 let tail_items = size % items_per_blob;
191
192 let segmented_cfg = SegmentedConfig {
194 partition: cfg.partition,
195 buffer_pool: cfg.buffer_pool,
196 write_buffer: cfg.write_buffer,
197 };
198
199 let mut inner = SegmentedJournal::init(context, segmented_cfg).await?;
200
201 inner.init_section_at_size(tail_section, tail_items).await?;
204 inner.sync_all().await?;
205
206 Ok(Self {
207 inner,
208 items_per_blob,
209 size,
210 })
211 }
212
213 pub(crate) async fn init_sync(
222 context: E,
223 cfg: Config,
224 range: Range<u64>,
225 ) -> Result<Self, crate::qmdb::Error> {
226 assert!(!range.is_empty(), "range must not be empty");
227
228 debug!(
229 range.start,
230 range.end,
231 items_per_blob = cfg.items_per_blob.get(),
232 "initializing contiguous fixed journal for sync"
233 );
234
235 let mut journal = Self::init(context.with_label("journal"), cfg.clone()).await?;
236 let size = journal.size();
237
238 if size == 0 {
240 if range.start == 0 {
241 debug!("no existing journal data, returning empty journal");
242 return Ok(journal);
243 } else {
244 debug!(
245 range.start,
246 "no existing journal data, initializing at sync range start"
247 );
248 journal.destroy().await?;
249 return Ok(Self::init_at_size(context, cfg, range.start).await?);
250 }
251 }
252
253 if size > range.end {
255 return Err(crate::qmdb::Error::UnexpectedData(Location::new_unchecked(
256 size,
257 )));
258 }
259
260 if size <= range.start {
262 debug!(
263 size,
264 range.start, "existing journal data is stale, re-initializing at start position"
265 );
266 journal.destroy().await?;
267 return Ok(Self::init_at_size(context, cfg, range.start).await?);
268 }
269
270 let oldest = journal.oldest_retained_pos();
272 if let Some(oldest_pos) = oldest {
273 if oldest_pos < range.start {
274 debug!(
275 oldest_pos,
276 range.start, "pruning journal to sync range start"
277 );
278 journal.prune(range.start).await?;
279 }
280 }
281
282 Ok(journal)
283 }
284
285 #[inline]
287 const fn position_to_section(&self, position: u64) -> (u64, u64) {
288 let section = position / self.items_per_blob;
289 let pos_in_section = position % self.items_per_blob;
290 (section, pos_in_section)
291 }
292
293 pub async fn sync(&mut self) -> Result<(), Error> {
298 let tail_section = self.size / self.items_per_blob;
299 self.inner.sync(tail_section).await
300 }
301
302 pub const fn size(&self) -> u64 {
305 self.size
306 }
307
308 pub async fn append(&mut self, item: A) -> Result<u64, Error> {
311 let position = self.size;
312 let (section, _pos_in_section) = self.position_to_section(position);
313
314 self.inner.append(section, item).await?;
315 self.size += 1;
316
317 if self.size.is_multiple_of(self.items_per_blob) {
320 self.inner.sync(section).await?;
321 self.inner.ensure_section_exists(section + 1).await?;
323 }
324
325 Ok(position)
326 }
327
328 pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
337 match size.cmp(&self.size) {
338 std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
339 std::cmp::Ordering::Equal => return Ok(()),
340 std::cmp::Ordering::Less => {}
341 }
342
343 if size < self.pruning_boundary() {
344 return Err(Error::InvalidRewind(size));
345 }
346
347 let (section, pos_in_section) = self.position_to_section(size);
348 let byte_offset = pos_in_section * SegmentedJournal::<E, A>::CHUNK_SIZE as u64;
349
350 self.inner.rewind(section, byte_offset).await?;
351 self.size = size;
352
353 Ok(())
354 }
355
356 pub fn oldest_retained_pos(&self) -> Option<u64> {
360 if self.pruning_boundary() == self.size {
361 return None;
362 }
363 Some(self.pruning_boundary())
364 }
365
366 pub fn pruning_boundary(&self) -> u64 {
368 self.inner
369 .oldest_section()
370 .expect("journal should have at least one section")
371 * self.items_per_blob
372 }
373
374 pub async fn read(&self, pos: u64) -> Result<A, Error> {
381 if pos >= self.size {
382 return Err(Error::ItemOutOfRange(pos));
383 }
384 if pos < self.pruning_boundary() {
385 return Err(Error::ItemPruned(pos));
386 }
387
388 let (section, pos_in_section) = self.position_to_section(pos);
390
391 self.inner.get(section, pos_in_section).await.map_err(|e| {
392 match e {
394 Error::SectionOutOfRange(e)
395 | Error::AlreadyPrunedToSection(e)
396 | Error::ItemOutOfRange(e) => {
397 Error::Corruption(format!("section/item should be found, but got: {e}"))
398 }
399 other => other,
400 }
401 })
402 }
403
404 pub async fn replay(
411 &self,
412 buffer: NonZeroUsize,
413 start_pos: u64,
414 ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + '_, Error> {
415 if start_pos > self.size {
416 return Err(Error::ItemOutOfRange(start_pos));
417 }
418
419 let (start_section, start_pos_in_section) = self.position_to_section(start_pos);
420 let items_per_blob = self.items_per_blob;
421
422 let oldest = self.inner.oldest_section().unwrap_or(start_section);
424 if let Some(newest) = self.inner.newest_section() {
425 for section in start_section.max(oldest)..newest {
426 let len = self.inner.section_len(section).await?;
427 if len < items_per_blob {
428 return Err(Error::Corruption(format!(
429 "section {section} incomplete: expected {items_per_blob} items, got {len}"
430 )));
431 }
432 }
433 }
434
435 let inner_stream = self
436 .inner
437 .replay(start_section, start_pos_in_section, buffer)
438 .await?;
439
440 let stream = inner_stream.map(move |result| {
442 result.map(|(section, pos_in_section, item)| {
443 let global_pos = section * items_per_blob + pos_in_section;
444 (global_pos, item)
445 })
446 });
447
448 Ok(stream)
449 }
450
451 pub async fn prune(&mut self, min_item_pos: u64) -> Result<bool, Error> {
458 let target_section = min_item_pos / self.items_per_blob;
460
461 let tail_section = self.size / self.items_per_blob;
463
464 let min_section = std::cmp::min(target_section, tail_section);
466
467 self.inner.prune(min_section).await
468 }
469
470 pub async fn destroy(self) -> Result<(), Error> {
472 self.inner.destroy().await
473 }
474}
475
476impl<E: Storage + Metrics, A: CodecFixedShared> super::Contiguous for Journal<E, A> {
478 type Item = A;
479
480 fn size(&self) -> u64 {
481 Self::size(self)
482 }
483
484 fn oldest_retained_pos(&self) -> Option<u64> {
485 Self::oldest_retained_pos(self)
486 }
487
488 fn pruning_boundary(&self) -> u64 {
489 Self::pruning_boundary(self)
490 }
491
492 async fn replay(
493 &self,
494 start_pos: u64,
495 buffer: NonZeroUsize,
496 ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error> {
497 Self::replay(self, buffer, start_pos).await
498 }
499
500 async fn read(&self, position: u64) -> Result<Self::Item, Error> {
501 Self::read(self, position).await
502 }
503}
504
505impl<E: Storage + Metrics, A: CodecFixedShared> MutableContiguous for Journal<E, A> {
506 async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {
507 Self::append(self, item).await
508 }
509
510 async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
511 Self::prune(self, min_position).await
512 }
513
514 async fn rewind(&mut self, size: u64) -> Result<(), Error> {
515 Self::rewind(self, size).await
516 }
517}
518
519impl<E: Storage + Metrics, A: CodecFixedShared> Persistable for Journal<E, A> {
520 type Error = Error;
521
522 async fn commit(&mut self) -> Result<(), Error> {
523 Self::sync(self).await
524 }
525
526 async fn sync(&mut self) -> Result<(), Error> {
527 Self::sync(self).await
528 }
529
530 async fn destroy(self) -> Result<(), Error> {
531 Self::destroy(self).await
532 }
533}
534
535#[cfg(test)]
536mod tests {
537 use super::*;
538 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
539 use commonware_macros::test_traced;
540 use commonware_runtime::{
541 deterministic::{self, Context},
542 Blob, Runner, Storage,
543 };
544 use commonware_utils::{NZUsize, NZU16, NZU64};
545 use futures::{pin_mut, StreamExt};
546 use std::num::NonZeroU16;
547
548 const PAGE_SIZE: NonZeroU16 = NZU16!(44);
549 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
550
551 fn test_digest(value: u64) -> Digest {
553 Sha256::hash(&value.to_be_bytes())
554 }
555
556 fn test_cfg(items_per_blob: NonZeroU64) -> Config {
557 Config {
558 partition: "test_partition".into(),
559 items_per_blob,
560 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
561 write_buffer: NZUsize!(2048),
562 }
563 }
564
565 #[test_traced]
566 fn test_fixed_journal_append_and_prune() {
567 let executor = deterministic::Runner::default();
569
570 executor.start(|context| async move {
572 let cfg = test_cfg(NZU64!(2));
574 let mut journal = Journal::init(context.clone(), cfg.clone())
575 .await
576 .expect("failed to initialize journal");
577
578 let mut pos = journal
580 .append(test_digest(0))
581 .await
582 .expect("failed to append data 0");
583 assert_eq!(pos, 0);
584
585 journal.sync().await.expect("Failed to sync journal");
587 drop(journal);
588
589 let cfg = test_cfg(NZU64!(2));
590 let mut journal = Journal::init(context.clone(), cfg.clone())
591 .await
592 .expect("failed to re-initialize journal");
593 assert_eq!(journal.size(), 1);
594
595 pos = journal
597 .append(test_digest(1))
598 .await
599 .expect("failed to append data 1");
600 assert_eq!(pos, 1);
601 pos = journal
602 .append(test_digest(2))
603 .await
604 .expect("failed to append data 2");
605 assert_eq!(pos, 2);
606
607 let item0 = journal.read(0).await.expect("failed to read data 0");
609 assert_eq!(item0, test_digest(0));
610 let item1 = journal.read(1).await.expect("failed to read data 1");
611 assert_eq!(item1, test_digest(1));
612 let item2 = journal.read(2).await.expect("failed to read data 2");
613 assert_eq!(item2, test_digest(2));
614 let err = journal.read(3).await.expect_err("expected read to fail");
615 assert!(matches!(err, Error::ItemOutOfRange(3)));
616
617 journal.sync().await.expect("failed to sync journal");
619
620 journal.prune(1).await.expect("failed to prune journal 1");
622
623 journal.prune(2).await.expect("failed to prune journal 2");
625 assert_eq!(journal.oldest_retained_pos(), Some(2));
626
627 let result0 = journal.read(0).await;
629 assert!(matches!(result0, Err(Error::ItemPruned(0))));
630 let result1 = journal.read(1).await;
631 assert!(matches!(result1, Err(Error::ItemPruned(1))));
632
633 let result2 = journal.read(2).await.unwrap();
635 assert_eq!(result2, test_digest(2));
636
637 for i in 3..10 {
639 let pos = journal
640 .append(test_digest(i))
641 .await
642 .expect("failed to append data");
643 assert_eq!(pos, i);
644 }
645
646 journal.prune(0).await.expect("no-op pruning failed");
648 assert_eq!(journal.inner.oldest_section(), Some(1));
649 assert_eq!(journal.inner.newest_section(), Some(5));
650 assert_eq!(journal.oldest_retained_pos(), Some(2));
651
652 journal
654 .prune(3 * cfg.items_per_blob.get())
655 .await
656 .expect("failed to prune journal 2");
657 assert_eq!(journal.inner.oldest_section(), Some(3));
658 assert_eq!(journal.inner.newest_section(), Some(5));
659 assert_eq!(journal.oldest_retained_pos(), Some(6));
660
661 journal
663 .prune(10000)
664 .await
665 .expect("failed to max-prune journal");
666 let size = journal.size();
667 assert_eq!(size, 10);
668 assert_eq!(journal.inner.oldest_section(), Some(5));
669 assert_eq!(journal.inner.newest_section(), Some(5));
670 assert_eq!(journal.oldest_retained_pos(), None);
673 assert_eq!(journal.pruning_boundary(), size);
675
676 {
677 let stream = journal
678 .replay(NZUsize!(1024), 0)
679 .await
680 .expect("failed to replay journal");
681 pin_mut!(stream);
682 let mut items = Vec::new();
683 while let Some(result) = stream.next().await {
684 match result {
685 Ok((pos, item)) => {
686 assert_eq!(test_digest(pos), item);
687 items.push(pos);
688 }
689 Err(err) => panic!("Failed to read item: {err}"),
690 }
691 }
692 assert_eq!(items, Vec::<u64>::new());
693 }
694
695 journal.destroy().await.unwrap();
696 });
697 }
698
699 #[test_traced]
701 fn test_fixed_journal_append_a_lot_of_data() {
702 let executor = deterministic::Runner::default();
704 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10000);
705 executor.start(|context| async move {
706 let cfg = test_cfg(ITEMS_PER_BLOB);
707 let mut journal = Journal::init(context.clone(), cfg.clone())
708 .await
709 .expect("failed to initialize journal");
710 for i in 0u64..ITEMS_PER_BLOB.get() * 2 - 1 {
712 journal
713 .append(test_digest(i))
714 .await
715 .expect("failed to append data");
716 }
717 journal.sync().await.expect("failed to sync journal");
719 drop(journal);
720 let journal = Journal::init(context.clone(), cfg.clone())
721 .await
722 .expect("failed to re-initialize journal");
723 for i in 0u64..10000 {
724 let item: Digest = journal.read(i).await.expect("failed to read data");
725 assert_eq!(item, test_digest(i));
726 }
727 journal.destroy().await.expect("failed to destroy journal");
728 });
729 }
730
731 #[test_traced]
732 fn test_fixed_journal_replay() {
733 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
734 let executor = deterministic::Runner::default();
736
737 executor.start(|context| async move {
739 let cfg = test_cfg(ITEMS_PER_BLOB);
741 let mut journal = Journal::init(context.clone(), cfg.clone())
742 .await
743 .expect("failed to initialize journal");
744
745 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
747 let pos = journal
748 .append(test_digest(i))
749 .await
750 .expect("failed to append data");
751 assert_eq!(pos, i);
752 }
753
754 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
756 let item: Digest = journal.read(i).await.expect("failed to read data");
757 assert_eq!(item, test_digest(i), "i={i}");
758 }
759
760 {
762 let stream = journal
763 .replay(NZUsize!(1024), 0)
764 .await
765 .expect("failed to replay journal");
766 let mut items = Vec::new();
767 pin_mut!(stream);
768 while let Some(result) = stream.next().await {
769 match result {
770 Ok((pos, item)) => {
771 assert_eq!(test_digest(pos), item, "pos={pos}, item={item:?}");
772 items.push(pos);
773 }
774 Err(err) => panic!("Failed to read item: {err}"),
775 }
776 }
777
778 assert_eq!(
780 items.len(),
781 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
782 );
783 items.sort();
784 for (i, pos) in items.iter().enumerate() {
785 assert_eq!(i as u64, *pos);
786 }
787 }
788
789 journal.sync().await.expect("Failed to sync journal");
790 drop(journal);
791
792 let (blob, _) = context
794 .open(&cfg.partition, &40u64.to_be_bytes())
795 .await
796 .expect("Failed to open blob");
797 let bad_bytes = 123456789u32;
799 blob.write_at(bad_bytes.to_be_bytes().to_vec(), 1)
800 .await
801 .expect("Failed to write bad bytes");
802 blob.sync().await.expect("Failed to sync blob");
803
804 let journal = Journal::init(context.clone(), cfg.clone())
806 .await
807 .expect("Failed to re-initialize journal");
808
809 let err = journal
811 .read(40 * ITEMS_PER_BLOB.get() + 1)
812 .await
813 .unwrap_err();
814 assert!(matches!(err, Error::Runtime(_)));
815
816 {
818 let mut error_found = false;
819 let stream = journal
820 .replay(NZUsize!(1024), 0)
821 .await
822 .expect("failed to replay journal");
823 let mut items = Vec::new();
824 pin_mut!(stream);
825 while let Some(result) = stream.next().await {
826 match result {
827 Ok((pos, item)) => {
828 assert_eq!(test_digest(pos), item);
829 items.push(pos);
830 }
831 Err(err) => {
832 error_found = true;
833 assert!(matches!(err, Error::Runtime(_)));
834 break;
835 }
836 }
837 }
838 assert!(error_found); }
840 });
841 }
842
843 #[test_traced]
844 fn test_fixed_journal_init_with_corrupted_historical_blobs() {
845 let executor = deterministic::Runner::default();
847 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
849 executor.start(|context| async move {
850 let cfg = test_cfg(ITEMS_PER_BLOB);
852 let mut journal = Journal::init(context.clone(), cfg.clone())
853 .await
854 .expect("failed to initialize journal");
855
856 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
858 let pos = journal
859 .append(test_digest(i))
860 .await
861 .expect("failed to append data");
862 assert_eq!(pos, i);
863 }
864 journal.sync().await.expect("Failed to sync journal");
865 drop(journal);
866
867 let (blob, size) = context
872 .open(&cfg.partition, &40u64.to_be_bytes())
873 .await
874 .expect("Failed to open blob");
875 blob.resize(size - 1).await.expect("Failed to corrupt blob");
876 blob.sync().await.expect("Failed to sync blob");
877
878 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
881 .await
882 .expect("failed to initialize journal");
883
884 let expected_size = ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2;
887 assert_eq!(journal.size(), expected_size);
888
889 match journal.replay(NZUsize!(1024), 0).await {
891 Err(Error::Corruption(msg)) => {
892 assert!(
893 msg.contains("section 40"),
894 "Error should mention section 40, got: {msg}"
895 );
896 }
897 Err(e) => panic!("Expected Corruption error for section 40, got: {:?}", e),
898 Ok(_) => panic!("Expected replay to fail with corruption"),
899 };
900 });
901 }
902
903 #[test_traced]
904 fn test_fixed_journal_replay_with_missing_historical_blob() {
905 let executor = deterministic::Runner::default();
906 executor.start(|context| async move {
907 let cfg = test_cfg(NZU64!(2));
908 let mut journal = Journal::init(context.clone(), cfg.clone())
909 .await
910 .expect("failed to initialize journal");
911
912 for i in 0u64..5 {
913 journal
914 .append(test_digest(i))
915 .await
916 .expect("failed to append data");
917 }
918 journal.sync().await.expect("failed to sync journal");
919 drop(journal);
920
921 context
922 .remove(&cfg.partition, Some(&1u64.to_be_bytes()))
923 .await
924 .expect("failed to remove blob");
925
926 let result = Journal::<_, Digest>::init(context.clone(), cfg.clone())
928 .await
929 .expect("init shouldn't fail");
930
931 match result.replay(NZUsize!(1024), 0).await {
933 Err(Error::Corruption(_)) => {}
934 Err(err) => panic!("expected Corruption, got: {err}"),
935 Ok(_) => panic!("expected Corruption, got ok"),
936 };
937
938 match result.read(2).await {
940 Err(Error::Corruption(_)) => {}
941 Err(err) => panic!("expected Corruption, got: {err}"),
942 Ok(_) => panic!("expected Corruption, got ok"),
943 };
944 });
945 }
946
947 #[test_traced]
948 fn test_fixed_journal_test_trim_blob() {
949 let executor = deterministic::Runner::default();
951 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
953 executor.start(|context| async move {
954 let cfg = test_cfg(ITEMS_PER_BLOB);
956 let mut journal = Journal::init(context.clone(), cfg.clone())
957 .await
958 .expect("failed to initialize journal");
959
960 let item_count = ITEMS_PER_BLOB.get() + 3;
962 for i in 0u64..item_count {
963 journal
964 .append(test_digest(i))
965 .await
966 .expect("failed to append data");
967 }
968 assert_eq!(journal.size(), item_count);
969 journal.sync().await.expect("Failed to sync journal");
970 drop(journal);
971
972 let (blob, size) = context
975 .open(&cfg.partition, &1u64.to_be_bytes())
976 .await
977 .expect("Failed to open blob");
978 blob.resize(size - 1).await.expect("Failed to corrupt blob");
979 blob.sync().await.expect("Failed to sync blob");
980
981 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
982 .await
983 .unwrap();
984
985 assert_eq!(journal.size(), item_count - 1);
988
989 journal.destroy().await.expect("Failed to destroy journal");
991 });
992 }
993
994 #[test_traced]
995 fn test_fixed_journal_partial_replay() {
996 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
997 const START_POS: u64 = 53;
1000
1001 let executor = deterministic::Runner::default();
1003 executor.start(|context| async move {
1005 let cfg = test_cfg(ITEMS_PER_BLOB);
1007 let mut journal = Journal::init(context.clone(), cfg.clone())
1008 .await
1009 .expect("failed to initialize journal");
1010
1011 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1013 let pos = journal
1014 .append(test_digest(i))
1015 .await
1016 .expect("failed to append data");
1017 assert_eq!(pos, i);
1018 }
1019
1020 {
1022 let stream = journal
1023 .replay(NZUsize!(1024), START_POS)
1024 .await
1025 .expect("failed to replay journal");
1026 let mut items = Vec::new();
1027 pin_mut!(stream);
1028 while let Some(result) = stream.next().await {
1029 match result {
1030 Ok((pos, item)) => {
1031 assert!(pos >= START_POS, "pos={pos}, expected >= {START_POS}");
1032 assert_eq!(
1033 test_digest(pos),
1034 item,
1035 "Item at position {pos} did not match expected digest"
1036 );
1037 items.push(pos);
1038 }
1039 Err(err) => panic!("Failed to read item: {err}"),
1040 }
1041 }
1042
1043 assert_eq!(
1045 items.len(),
1046 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1047 - START_POS as usize
1048 );
1049 items.sort();
1050 for (i, pos) in items.iter().enumerate() {
1051 assert_eq!(i as u64, *pos - START_POS);
1052 }
1053 }
1054
1055 journal.destroy().await.unwrap();
1056 });
1057 }
1058
1059 #[test_traced]
1060 fn test_fixed_journal_recover_from_partial_write() {
1061 let executor = deterministic::Runner::default();
1063
1064 executor.start(|context| async move {
1066 let cfg = test_cfg(NZU64!(3));
1068 let mut journal = Journal::init(context.clone(), cfg.clone())
1069 .await
1070 .expect("failed to initialize journal");
1071 for i in 0..5 {
1072 journal
1073 .append(test_digest(i))
1074 .await
1075 .expect("failed to append data");
1076 }
1077 assert_eq!(journal.size(), 5);
1078 journal.sync().await.expect("Failed to sync journal");
1079 drop(journal);
1080
1081 let (blob, size) = context
1083 .open(&cfg.partition, &1u64.to_be_bytes())
1084 .await
1085 .expect("Failed to open blob");
1086 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1088 blob.sync().await.expect("Failed to sync blob");
1089
1090 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1092 .await
1093 .expect("Failed to re-initialize journal");
1094 assert_eq!(journal.size(), 4);
1096 drop(journal);
1097
1098 context
1100 .remove(&cfg.partition, Some(&1u64.to_be_bytes()))
1101 .await
1102 .expect("Failed to remove blob");
1103 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1104 .await
1105 .expect("Failed to re-initialize journal");
1106 assert_eq!(journal.size(), 3);
1108
1109 journal.destroy().await.unwrap();
1110 });
1111 }
1112
1113 #[test_traced]
1114 fn test_fixed_journal_recover_to_empty_from_partial_write() {
1115 let executor = deterministic::Runner::default();
1116 executor.start(|context| async move {
1117 let cfg = test_cfg(NZU64!(10));
1119 let mut journal = Journal::init(context.clone(), cfg.clone())
1120 .await
1121 .expect("failed to initialize journal");
1122 journal
1124 .append(test_digest(0))
1125 .await
1126 .expect("failed to append data");
1127 assert_eq!(journal.size(), 1);
1128 journal.sync().await.expect("Failed to sync journal");
1129 drop(journal);
1130
1131 let (blob, size) = context
1133 .open(&cfg.partition, &0u64.to_be_bytes())
1134 .await
1135 .expect("Failed to open blob");
1136 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1138 blob.sync().await.expect("Failed to sync blob");
1139
1140 let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1142 .await
1143 .expect("Failed to re-initialize journal");
1144
1145 assert_eq!(journal.size(), 0);
1148 assert_eq!(journal.oldest_retained_pos(), None);
1149 journal
1151 .append(test_digest(0))
1152 .await
1153 .expect("failed to append data");
1154 assert_eq!(journal.size(), 1);
1155
1156 journal.destroy().await.unwrap();
1157 });
1158 }
1159
1160 #[test_traced("DEBUG")]
1161 fn test_fixed_journal_recover_from_unwritten_data() {
1162 let executor = deterministic::Runner::default();
1163 executor.start(|context| async move {
1164 let cfg = test_cfg(NZU64!(10));
1166 let mut journal = Journal::init(context.clone(), cfg.clone())
1167 .await
1168 .expect("failed to initialize journal");
1169
1170 journal
1172 .append(test_digest(0))
1173 .await
1174 .expect("failed to append data");
1175 assert_eq!(journal.size(), 1);
1176 journal.sync().await.expect("Failed to sync journal");
1177 drop(journal);
1178
1179 let (blob, size) = context
1182 .open(&cfg.partition, &0u64.to_be_bytes())
1183 .await
1184 .expect("Failed to open blob");
1185 blob.write_at(vec![0u8; PAGE_SIZE.get() as usize * 3], size)
1186 .await
1187 .expect("Failed to extend blob");
1188 blob.sync().await.expect("Failed to sync blob");
1189
1190 let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1192 .await
1193 .expect("Failed to re-initialize journal");
1194
1195 assert_eq!(journal.size(), 1);
1198
1199 journal
1201 .append(test_digest(1))
1202 .await
1203 .expect("failed to append data");
1204
1205 journal.destroy().await.unwrap();
1206 });
1207 }
1208
1209 #[test_traced]
1210 fn test_fixed_journal_rewinding() {
1211 let executor = deterministic::Runner::default();
1212 executor.start(|context| async move {
1213 let cfg = test_cfg(NZU64!(2));
1215 let mut journal = Journal::init(context.clone(), cfg.clone())
1216 .await
1217 .expect("failed to initialize journal");
1218 assert!(matches!(journal.rewind(0).await, Ok(())));
1219 assert!(matches!(
1220 journal.rewind(1).await,
1221 Err(Error::InvalidRewind(1))
1222 ));
1223
1224 journal
1226 .append(test_digest(0))
1227 .await
1228 .expect("failed to append data 0");
1229 assert_eq!(journal.size(), 1);
1230 assert!(matches!(journal.rewind(1).await, Ok(()))); assert!(matches!(journal.rewind(0).await, Ok(())));
1232 assert_eq!(journal.size(), 0);
1233
1234 for i in 0..7 {
1236 let pos = journal
1237 .append(test_digest(i))
1238 .await
1239 .expect("failed to append data");
1240 assert_eq!(pos, i);
1241 }
1242 assert_eq!(journal.size(), 7);
1243
1244 assert!(matches!(journal.rewind(4).await, Ok(())));
1246 assert_eq!(journal.size(), 4);
1247
1248 assert!(matches!(journal.rewind(0).await, Ok(())));
1250 assert_eq!(journal.size(), 0);
1251
1252 for _ in 0..10 {
1254 for i in 0..100 {
1255 journal
1256 .append(test_digest(i))
1257 .await
1258 .expect("failed to append data");
1259 }
1260 journal.rewind(journal.size() - 49).await.unwrap();
1261 }
1262 const ITEMS_REMAINING: u64 = 10 * (100 - 49);
1263 assert_eq!(journal.size(), ITEMS_REMAINING);
1264
1265 journal.sync().await.expect("Failed to sync journal");
1266 drop(journal);
1267
1268 let mut cfg = test_cfg(NZU64!(3));
1270 cfg.partition = "test_partition_2".into();
1271 let mut journal = Journal::init(context.clone(), cfg.clone())
1272 .await
1273 .expect("failed to initialize journal");
1274 for _ in 0..10 {
1275 for i in 0..100 {
1276 journal
1277 .append(test_digest(i))
1278 .await
1279 .expect("failed to append data");
1280 }
1281 journal.rewind(journal.size() - 49).await.unwrap();
1282 }
1283 assert_eq!(journal.size(), ITEMS_REMAINING);
1284
1285 journal.sync().await.expect("Failed to sync journal");
1286 drop(journal);
1287
1288 let mut journal: Journal<_, Digest> = Journal::init(context.clone(), cfg.clone())
1290 .await
1291 .expect("failed to re-initialize journal");
1292 assert_eq!(journal.size(), 10 * (100 - 49));
1293
1294 journal.prune(300).await.expect("pruning failed");
1296 assert_eq!(journal.size(), ITEMS_REMAINING);
1297 assert!(matches!(
1299 journal.rewind(299).await,
1300 Err(Error::InvalidRewind(299))
1301 ));
1302 assert!(matches!(journal.rewind(300).await, Ok(())));
1305 assert_eq!(journal.size(), 300);
1306 assert_eq!(journal.oldest_retained_pos(), None);
1307
1308 journal.destroy().await.unwrap();
1309 });
1310 }
1311
1312 #[test_traced]
1320 fn test_fixed_journal_recover_from_page_boundary_truncation() {
1321 let executor = deterministic::Runner::default();
1322 executor.start(|context: Context| async move {
1323 let cfg = test_cfg(NZU64!(100));
1325 let mut journal = Journal::init(context.clone(), cfg.clone())
1326 .await
1327 .expect("failed to initialize journal");
1328
1329 for i in 0u64..10 {
1337 journal
1338 .append(test_digest(i))
1339 .await
1340 .expect("failed to append data");
1341 }
1342 assert_eq!(journal.size(), 10);
1343 journal.sync().await.expect("Failed to sync journal");
1344 drop(journal);
1345
1346 let physical_page_size = PAGE_SIZE.get() as u64 + 12;
1349 let (blob, size) = context
1350 .open(&cfg.partition, &0u64.to_be_bytes())
1351 .await
1352 .expect("Failed to open blob");
1353
1354 let full_pages = size / physical_page_size;
1356 assert!(full_pages >= 2, "need at least 2 pages for this test");
1357 let truncate_to = (full_pages - 1) * physical_page_size;
1358
1359 blob.resize(truncate_to)
1360 .await
1361 .expect("Failed to truncate blob");
1362 blob.sync().await.expect("Failed to sync blob");
1363
1364 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1366 .await
1367 .expect("Failed to re-initialize journal after page truncation");
1368
1369 let remaining_logical_bytes = (full_pages - 1) * PAGE_SIZE.get() as u64;
1374 let expected_items = remaining_logical_bytes / 32; assert_eq!(
1376 journal.size(),
1377 expected_items,
1378 "Journal should recover to {} items after truncation",
1379 expected_items
1380 );
1381
1382 for i in 0..expected_items {
1384 let item = journal
1385 .read(i)
1386 .await
1387 .expect("failed to read recovered item");
1388 assert_eq!(item, test_digest(i), "item {} mismatch after recovery", i);
1389 }
1390
1391 journal.destroy().await.expect("Failed to destroy journal");
1392 });
1393 }
1394
1395 #[test_traced]
1401 fn test_single_item_per_blob() {
1402 let executor = deterministic::Runner::default();
1403 executor.start(|context| async move {
1404 let cfg = Config {
1405 partition: "single_item_per_blob".into(),
1406 items_per_blob: NZU64!(1),
1407 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1408 write_buffer: NZUsize!(2048),
1409 };
1410
1411 let mut journal = Journal::init(context.clone(), cfg.clone())
1413 .await
1414 .expect("failed to initialize journal");
1415
1416 assert_eq!(journal.size(), 0);
1418 assert_eq!(journal.oldest_retained_pos(), None);
1419
1420 let pos = journal
1422 .append(test_digest(0))
1423 .await
1424 .expect("failed to append");
1425 assert_eq!(pos, 0);
1426 assert_eq!(journal.size(), 1);
1427
1428 journal.sync().await.expect("failed to sync");
1430
1431 let value = journal
1433 .read(journal.size() - 1)
1434 .await
1435 .expect("failed to read");
1436 assert_eq!(value, test_digest(0));
1437
1438 for i in 1..10u64 {
1440 let pos = journal
1441 .append(test_digest(i))
1442 .await
1443 .expect("failed to append");
1444 assert_eq!(pos, i);
1445 assert_eq!(journal.size(), i + 1);
1446
1447 let value = journal
1449 .read(journal.size() - 1)
1450 .await
1451 .expect("failed to read");
1452 assert_eq!(value, test_digest(i));
1453 }
1454
1455 for i in 0..10u64 {
1457 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
1458 }
1459
1460 journal.sync().await.expect("failed to sync");
1461
1462 journal.prune(5).await.expect("failed to prune");
1465
1466 assert_eq!(journal.size(), 10);
1468
1469 assert_eq!(journal.oldest_retained_pos(), Some(5));
1471
1472 let value = journal
1474 .read(journal.size() - 1)
1475 .await
1476 .expect("failed to read");
1477 assert_eq!(value, test_digest(9));
1478
1479 for i in 0..5 {
1481 assert!(matches!(journal.read(i).await, Err(Error::ItemPruned(_))));
1482 }
1483
1484 for i in 5..10u64 {
1486 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
1487 }
1488
1489 for i in 10..15u64 {
1491 let pos = journal
1492 .append(test_digest(i))
1493 .await
1494 .expect("failed to append");
1495 assert_eq!(pos, i);
1496
1497 let value = journal
1499 .read(journal.size() - 1)
1500 .await
1501 .expect("failed to read");
1502 assert_eq!(value, test_digest(i));
1503 }
1504
1505 journal.sync().await.expect("failed to sync");
1506 drop(journal);
1507
1508 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1510 .await
1511 .expect("failed to re-initialize journal");
1512
1513 assert_eq!(journal.size(), 15);
1515
1516 assert_eq!(journal.oldest_retained_pos(), Some(5));
1518
1519 let value = journal
1521 .read(journal.size() - 1)
1522 .await
1523 .expect("failed to read");
1524 assert_eq!(value, test_digest(14));
1525
1526 for i in 5..15u64 {
1528 assert_eq!(journal.read(i).await.unwrap(), test_digest(i));
1529 }
1530
1531 journal.destroy().await.expect("failed to destroy journal");
1532
1533 let mut journal = Journal::init(context.clone(), cfg.clone())
1536 .await
1537 .expect("failed to initialize journal");
1538
1539 for i in 0..10u64 {
1541 journal.append(test_digest(i + 100)).await.unwrap();
1542 }
1543
1544 journal.prune(5).await.unwrap();
1546 assert_eq!(journal.size(), 10);
1547 assert_eq!(journal.oldest_retained_pos(), Some(5));
1548
1549 journal.sync().await.unwrap();
1551 drop(journal);
1552
1553 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1555 .await
1556 .expect("failed to re-initialize journal");
1557
1558 assert_eq!(journal.size(), 10);
1560 assert_eq!(journal.oldest_retained_pos(), Some(5));
1561
1562 let value = journal.read(journal.size() - 1).await.unwrap();
1564 assert_eq!(value, test_digest(109));
1565
1566 for i in 5..10u64 {
1568 assert_eq!(journal.read(i).await.unwrap(), test_digest(i + 100));
1569 }
1570
1571 journal.destroy().await.expect("failed to destroy journal");
1572
1573 let mut journal = Journal::init(context.clone(), cfg.clone())
1575 .await
1576 .expect("failed to initialize journal");
1577
1578 for i in 0..5u64 {
1579 journal.append(test_digest(i + 200)).await.unwrap();
1580 }
1581 journal.sync().await.unwrap();
1582
1583 journal.prune(5).await.unwrap();
1585 assert_eq!(journal.size(), 5); assert_eq!(journal.oldest_retained_pos(), None); let result = journal.read(journal.size() - 1).await;
1590 assert!(matches!(result, Err(Error::ItemPruned(4))));
1591
1592 journal.append(test_digest(205)).await.unwrap();
1594 assert_eq!(journal.oldest_retained_pos(), Some(5));
1595 assert_eq!(
1596 journal.read(journal.size() - 1).await.unwrap(),
1597 test_digest(205)
1598 );
1599
1600 journal.destroy().await.expect("failed to destroy journal");
1601 });
1602 }
1603}