1use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
24use crate::journal::Error;
25use bytes::Buf;
26use commonware_codec::{CodecFixed, CodecFixedShared, DecodeExt as _, ReadExt as _};
27use commonware_runtime::{
28 buffer::pool::{PoolRef, Replay},
29 Blob, Metrics, Storage,
30};
31use futures::{
32 stream::{self, Stream},
33 StreamExt,
34};
35use std::{marker::PhantomData, num::NonZeroUsize};
36use tracing::{trace, warn};
37
38struct ReplayState<B: Blob> {
40 section: u64,
41 replay: Replay<B>,
42 position: u64,
43 done: bool,
44}
45
46#[derive(Clone)]
48pub struct Config {
49 pub partition: String,
51
52 pub buffer_pool: PoolRef,
54
55 pub write_buffer: NonZeroUsize,
57}
58
59pub struct Journal<E: Storage + Metrics, A: CodecFixed> {
73 manager: Manager<E, AppendFactory>,
74 _array: PhantomData<A>,
75}
76
77impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
78 pub const CHUNK_SIZE: usize = A::SIZE;
80 const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
81
82 pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
87 let manager_cfg = ManagerConfig {
88 partition: cfg.partition,
89 factory: AppendFactory {
90 write_buffer: cfg.write_buffer,
91 pool_ref: cfg.buffer_pool,
92 },
93 };
94 let mut manager = Manager::init(context, manager_cfg).await?;
95
96 let sections: Vec<_> = manager.sections().collect();
98 for section in sections {
99 let size = manager.size(section).await?;
100 if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
101 let valid_size = size - (size % Self::CHUNK_SIZE_U64);
102 warn!(
103 section,
104 invalid_size = size,
105 new_size = valid_size,
106 "trailing bytes detected: truncating"
107 );
108 manager.rewind_section(section, valid_size).await?;
109 }
110 }
111
112 Ok(Self {
113 manager,
114 _array: PhantomData,
115 })
116 }
117
118 pub async fn append(&mut self, section: u64, item: A) -> Result<u64, Error> {
122 let blob = self.manager.get_or_create(section).await?;
123
124 let size = blob.size().await;
125 if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
126 return Err(Error::InvalidBlobSize(section, size));
127 }
128 let position = size / Self::CHUNK_SIZE_U64;
129
130 let buf = item.encode_mut();
132 blob.append(&buf).await?;
133 trace!(section, position, "appended item");
134
135 Ok(position)
136 }
137
138 pub async fn get(&self, section: u64, position: u64) -> Result<A, Error> {
146 let blob = self
147 .manager
148 .get(section)?
149 .ok_or(Error::SectionOutOfRange(section))?;
150
151 let offset = position
152 .checked_mul(Self::CHUNK_SIZE_U64)
153 .ok_or(Error::ItemOutOfRange(position))?;
154 let end = offset
155 .checked_add(Self::CHUNK_SIZE_U64)
156 .ok_or(Error::ItemOutOfRange(position))?;
157 if end > blob.size().await {
158 return Err(Error::ItemOutOfRange(position));
159 }
160
161 let buf = blob.read_at(vec![0u8; Self::CHUNK_SIZE], offset).await?;
162 A::decode(buf.as_ref()).map_err(Error::Codec)
163 }
164
165 pub async fn last(&self, section: u64) -> Result<Option<A>, Error> {
167 let blob = self
168 .manager
169 .get(section)?
170 .ok_or(Error::SectionOutOfRange(section))?;
171
172 let size = blob.size().await;
173 if size < Self::CHUNK_SIZE_U64 {
174 return Ok(None);
175 }
176
177 let last_position = (size / Self::CHUNK_SIZE_U64) - 1;
178 let offset = last_position * Self::CHUNK_SIZE_U64;
179 let buf = blob.read_at(vec![0u8; Self::CHUNK_SIZE], offset).await?;
180 A::decode(buf.as_ref()).map_err(Error::Codec).map(Some)
181 }
182
183 pub async fn replay(
187 &self,
188 start_section: u64,
189 start_position: u64,
190 buffer: NonZeroUsize,
191 ) -> Result<impl Stream<Item = Result<(u64, u64, A), Error>> + Send + '_, Error> {
192 let mut blob_info = Vec::new();
194 for (§ion, blob) in self.manager.sections_from(start_section) {
195 let blob_size = blob.size().await;
196 let mut replay = blob.replay(buffer).await?;
197 let initial_position = if section == start_section {
199 let start = start_position * Self::CHUNK_SIZE_U64;
200 if start > blob_size {
201 return Err(Error::ItemOutOfRange(start_position));
202 }
203 replay.seek_to(start).await?;
204 start_position
205 } else {
206 0
207 };
208 blob_info.push((section, replay, initial_position));
209 }
210
211 Ok(
215 stream::iter(blob_info).flat_map(move |(section, replay, initial_position)| {
216 stream::unfold(
217 ReplayState {
218 section,
219 replay,
220 position: initial_position,
221 done: false,
222 },
223 move |mut state| async move {
224 if state.done {
225 return None;
226 }
227
228 let mut batch: Vec<Result<(u64, u64, A), Error>> = Vec::new();
229 loop {
230 match state.replay.ensure(Self::CHUNK_SIZE).await {
232 Ok(true) => {}
233 Ok(false) => {
234 state.done = true;
236 return if batch.is_empty() {
237 None
238 } else {
239 Some((batch, state))
240 };
241 }
242 Err(err) => {
243 batch.push(Err(Error::Runtime(err)));
244 state.done = true;
245 return Some((batch, state));
246 }
247 }
248
249 while state.replay.remaining() >= Self::CHUNK_SIZE {
251 match A::read(&mut state.replay) {
252 Ok(item) => {
253 batch.push(Ok((state.section, state.position, item)));
254 state.position += 1;
255 }
256 Err(err) => {
257 batch.push(Err(Error::Codec(err)));
258 state.done = true;
259 return Some((batch, state));
260 }
261 }
262 }
263
264 if !batch.is_empty() {
266 return Some((batch, state));
267 }
268 }
269 },
270 )
271 .flat_map(stream::iter)
272 }),
273 )
274 }
275
276 pub async fn sync(&self, section: u64) -> Result<(), Error> {
278 self.manager.sync(section).await
279 }
280
281 pub async fn sync_all(&self) -> Result<(), Error> {
283 self.manager.sync_all().await
284 }
285
286 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
288 self.manager.prune(min).await
289 }
290
291 pub fn oldest_section(&self) -> Option<u64> {
293 self.manager.oldest_section()
294 }
295
296 pub fn newest_section(&self) -> Option<u64> {
298 self.manager.newest_section()
299 }
300
301 pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
303 self.manager.sections_from(0).map(|(section, _)| *section)
304 }
305
306 pub async fn section_len(&self, section: u64) -> Result<u64, Error> {
308 let size = self.manager.size(section).await?;
309 Ok(size / Self::CHUNK_SIZE_U64)
310 }
311
312 pub async fn size(&self, section: u64) -> Result<u64, Error> {
314 self.manager.size(section).await
315 }
316
317 pub async fn rewind(&mut self, section: u64, offset: u64) -> Result<(), Error> {
322 self.manager.rewind(section, offset).await
323 }
324
325 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
329 self.manager.rewind_section(section, size).await
330 }
331
332 pub async fn destroy(self) -> Result<(), Error> {
334 self.manager.destroy().await
335 }
336
337 pub(crate) async fn init_section_at_size(
346 &mut self,
347 section: u64,
348 item_count: u64,
349 ) -> Result<(), Error> {
350 let blob = self.manager.get_or_create(section).await?;
352
353 let target_size = item_count * Self::CHUNK_SIZE_U64;
355
356 blob.resize(target_size).await?;
358
359 Ok(())
360 }
361
362 pub(crate) async fn ensure_section_exists(&mut self, section: u64) -> Result<(), Error> {
367 self.manager.get_or_create(section).await?;
368 Ok(())
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
376 use commonware_macros::test_traced;
377 use commonware_runtime::{buffer::PoolRef, deterministic, Runner};
378 use commonware_utils::{NZUsize, NZU16};
379 use core::num::NonZeroU16;
380 use futures::{pin_mut, StreamExt};
381
382 const PAGE_SIZE: NonZeroU16 = NZU16!(44);
383 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
384
385 fn test_digest(value: u64) -> Digest {
386 Sha256::hash(&value.to_be_bytes())
387 }
388
389 fn test_cfg() -> Config {
390 Config {
391 partition: "test_partition".into(),
392 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
393 write_buffer: NZUsize!(2048),
394 }
395 }
396
397 #[test_traced]
398 fn test_segmented_fixed_append_and_get() {
399 let executor = deterministic::Runner::default();
400 executor.start(|context| async move {
401 let cfg = test_cfg();
402 let mut journal = Journal::init(context.clone(), cfg.clone())
403 .await
404 .expect("failed to init");
405
406 let pos0 = journal
407 .append(1, test_digest(0))
408 .await
409 .expect("failed to append");
410 assert_eq!(pos0, 0);
411
412 let pos1 = journal
413 .append(1, test_digest(1))
414 .await
415 .expect("failed to append");
416 assert_eq!(pos1, 1);
417
418 let pos2 = journal
419 .append(2, test_digest(2))
420 .await
421 .expect("failed to append");
422 assert_eq!(pos2, 0);
423
424 let item0 = journal.get(1, 0).await.expect("failed to get");
425 assert_eq!(item0, test_digest(0));
426
427 let item1 = journal.get(1, 1).await.expect("failed to get");
428 assert_eq!(item1, test_digest(1));
429
430 let item2 = journal.get(2, 0).await.expect("failed to get");
431 assert_eq!(item2, test_digest(2));
432
433 let err = journal.get(1, 2).await;
434 assert!(matches!(err, Err(Error::ItemOutOfRange(2))));
435
436 let err = journal.get(3, 0).await;
437 assert!(matches!(err, Err(Error::SectionOutOfRange(3))));
438
439 journal.destroy().await.expect("failed to destroy");
440 });
441 }
442
443 #[test_traced]
444 fn test_segmented_fixed_replay() {
445 let executor = deterministic::Runner::default();
446 executor.start(|context| async move {
447 let cfg = test_cfg();
448 let mut journal = Journal::init(context.clone(), cfg.clone())
449 .await
450 .expect("failed to init");
451
452 for i in 0u64..10 {
453 journal
454 .append(1, test_digest(i))
455 .await
456 .expect("failed to append");
457 }
458 for i in 10u64..20 {
459 journal
460 .append(2, test_digest(i))
461 .await
462 .expect("failed to append");
463 }
464
465 journal.sync_all().await.expect("failed to sync");
466 drop(journal);
467
468 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
469 .await
470 .expect("failed to re-init");
471
472 let items = {
473 let stream = journal
474 .replay(0, 0, NZUsize!(1024))
475 .await
476 .expect("failed to replay");
477 pin_mut!(stream);
478
479 let mut items = Vec::new();
480 while let Some(result) = stream.next().await {
481 match result {
482 Ok((section, pos, item)) => items.push((section, pos, item)),
483 Err(err) => panic!("replay error: {err}"),
484 }
485 }
486 items
487 };
488
489 assert_eq!(items.len(), 20);
490 for (i, item) in items.iter().enumerate().take(10) {
491 assert_eq!(item.0, 1);
492 assert_eq!(item.1, i as u64);
493 assert_eq!(item.2, test_digest(i as u64));
494 }
495 for (i, item) in items.iter().enumerate().skip(10).take(10) {
496 assert_eq!(item.0, 2);
497 assert_eq!(item.1, (i - 10) as u64);
498 assert_eq!(item.2, test_digest(i as u64));
499 }
500
501 journal.destroy().await.expect("failed to destroy");
502 });
503 }
504
505 #[test_traced]
506 fn test_segmented_fixed_replay_with_start_offset() {
507 let executor = deterministic::Runner::default();
509 executor.start(|context| async move {
510 let cfg = test_cfg();
511 let mut journal = Journal::init(context.clone(), cfg.clone())
512 .await
513 .expect("failed to init");
514
515 for i in 0u64..10 {
517 journal
518 .append(1, test_digest(i))
519 .await
520 .expect("failed to append");
521 }
522 for i in 10u64..15 {
524 journal
525 .append(2, test_digest(i))
526 .await
527 .expect("failed to append");
528 }
529 journal.sync_all().await.expect("failed to sync");
530 drop(journal);
531
532 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
533 .await
534 .expect("failed to re-init");
535
536 {
538 let stream = journal
539 .replay(1, 5, NZUsize!(1024))
540 .await
541 .expect("failed to replay");
542 pin_mut!(stream);
543
544 let mut items = Vec::new();
545 while let Some(result) = stream.next().await {
546 let (section, pos, item) = result.expect("replay error");
547 items.push((section, pos, item));
548 }
549
550 assert_eq!(
551 items.len(),
552 10,
553 "Should have 5 items from section 1 + 5 from section 2"
554 );
555
556 for (i, (section, pos, item)) in items.iter().enumerate().take(5) {
558 assert_eq!(*section, 1);
559 assert_eq!(*pos, (i + 5) as u64);
560 assert_eq!(*item, test_digest((i + 5) as u64));
561 }
562
563 for (i, (section, pos, item)) in items.iter().enumerate().skip(5) {
565 assert_eq!(*section, 2);
566 assert_eq!(*pos, (i - 5) as u64);
567 assert_eq!(*item, test_digest((i + 5) as u64));
568 }
569 }
570
571 {
573 let stream = journal
574 .replay(1, 9, NZUsize!(1024))
575 .await
576 .expect("failed to replay");
577 pin_mut!(stream);
578
579 let mut items = Vec::new();
580 while let Some(result) = stream.next().await {
581 let (section, pos, item) = result.expect("replay error");
582 items.push((section, pos, item));
583 }
584
585 assert_eq!(
586 items.len(),
587 6,
588 "Should have 1 item from section 1 + 5 from section 2"
589 );
590 assert_eq!(items[0], (1, 9, test_digest(9)));
591 for (i, (section, pos, item)) in items.iter().enumerate().skip(1) {
592 assert_eq!(*section, 2);
593 assert_eq!(*pos, (i - 1) as u64);
594 assert_eq!(*item, test_digest((i + 9) as u64));
595 }
596 }
597
598 {
600 let stream = journal
601 .replay(2, 3, NZUsize!(1024))
602 .await
603 .expect("failed to replay");
604 pin_mut!(stream);
605
606 let mut items = Vec::new();
607 while let Some(result) = stream.next().await {
608 let (section, pos, item) = result.expect("replay error");
609 items.push((section, pos, item));
610 }
611
612 assert_eq!(items.len(), 2, "Should have 2 items from section 2");
613 assert_eq!(items[0], (2, 3, test_digest(13)));
614 assert_eq!(items[1], (2, 4, test_digest(14)));
615 }
616
617 let result = journal.replay(1, 100, NZUsize!(1024)).await;
619 assert!(matches!(result, Err(Error::ItemOutOfRange(100))));
620 drop(result);
621
622 journal.destroy().await.expect("failed to destroy");
623 });
624 }
625
626 #[test_traced]
627 fn test_segmented_fixed_prune() {
628 let executor = deterministic::Runner::default();
629 executor.start(|context| async move {
630 let cfg = test_cfg();
631 let mut journal = Journal::init(context.clone(), cfg.clone())
632 .await
633 .expect("failed to init");
634
635 for section in 1u64..=5 {
636 journal
637 .append(section, test_digest(section))
638 .await
639 .expect("failed to append");
640 }
641 journal.sync_all().await.expect("failed to sync");
642
643 journal.prune(3).await.expect("failed to prune");
644
645 let err = journal.get(1, 0).await;
646 assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
647
648 let err = journal.get(2, 0).await;
649 assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
650
651 let item = journal.get(3, 0).await.expect("should exist");
652 assert_eq!(item, test_digest(3));
653
654 journal.destroy().await.expect("failed to destroy");
655 });
656 }
657
658 #[test_traced]
659 fn test_segmented_fixed_rewind() {
660 let executor = deterministic::Runner::default();
661 executor.start(|context| async move {
662 let cfg = test_cfg();
663 let mut journal = Journal::init(context.clone(), cfg.clone())
664 .await
665 .expect("failed to init");
666
667 for section in 1u64..=3 {
669 journal
670 .append(section, test_digest(section))
671 .await
672 .expect("failed to append");
673 }
674 journal.sync_all().await.expect("failed to sync");
675
676 for section in 1u64..=3 {
678 let size = journal.size(section).await.expect("failed to get size");
679 assert!(size > 0, "section {section} should have data");
680 }
681
682 let size = journal.size(1).await.expect("failed to get size");
684 journal.rewind(1, size).await.expect("failed to rewind");
685
686 let size = journal.size(1).await.expect("failed to get size");
688 assert!(size > 0, "section 1 should still have data");
689
690 for section in 2u64..=3 {
692 let size = journal.size(section).await.expect("failed to get size");
693 assert_eq!(size, 0, "section {section} should be removed");
694 }
695
696 let item = journal.get(1, 0).await.expect("failed to get");
698 assert_eq!(item, test_digest(1));
699
700 journal.destroy().await.expect("failed to destroy");
701 });
702 }
703
704 #[test_traced]
705 fn test_segmented_fixed_rewind_many_sections() {
706 let executor = deterministic::Runner::default();
707 executor.start(|context| async move {
708 let cfg = test_cfg();
709 let mut journal = Journal::init(context.clone(), cfg.clone())
710 .await
711 .expect("failed to init");
712
713 for section in 1u64..=10 {
715 journal
716 .append(section, test_digest(section))
717 .await
718 .expect("failed to append");
719 }
720 journal.sync_all().await.expect("failed to sync");
721
722 let size = journal.size(5).await.expect("failed to get size");
724 journal.rewind(5, size).await.expect("failed to rewind");
725
726 for section in 1u64..=5 {
728 let size = journal.size(section).await.expect("failed to get size");
729 assert!(size > 0, "section {section} should still have data");
730 }
731
732 for section in 6u64..=10 {
734 let size = journal.size(section).await.expect("failed to get size");
735 assert_eq!(size, 0, "section {section} should be removed");
736 }
737
738 {
740 let stream = journal
741 .replay(0, 0, NZUsize!(1024))
742 .await
743 .expect("failed to replay");
744 pin_mut!(stream);
745 let mut items = Vec::new();
746 while let Some(result) = stream.next().await {
747 let (section, _, item) = result.expect("failed to read");
748 items.push((section, item));
749 }
750 assert_eq!(items.len(), 5);
751 for (i, (section, item)) in items.iter().enumerate() {
752 assert_eq!(*section, (i + 1) as u64);
753 assert_eq!(*item, test_digest((i + 1) as u64));
754 }
755 }
756
757 journal.destroy().await.expect("failed to destroy");
758 });
759 }
760
761 #[test_traced]
762 fn test_segmented_fixed_rewind_persistence() {
763 let executor = deterministic::Runner::default();
764 executor.start(|context| async move {
765 let cfg = test_cfg();
766
767 let mut journal = Journal::init(context.clone(), cfg.clone())
769 .await
770 .expect("failed to init");
771 for section in 1u64..=5 {
772 journal
773 .append(section, test_digest(section))
774 .await
775 .expect("failed to append");
776 }
777 journal.sync_all().await.expect("failed to sync");
778
779 let size = journal.size(2).await.expect("failed to get size");
781 journal.rewind(2, size).await.expect("failed to rewind");
782 journal.sync_all().await.expect("failed to sync");
783 drop(journal);
784
785 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
787 .await
788 .expect("failed to re-init");
789
790 for section in 1u64..=2 {
792 let size = journal.size(section).await.expect("failed to get size");
793 assert!(size > 0, "section {section} should have data after restart");
794 }
795
796 for section in 3u64..=5 {
798 let size = journal.size(section).await.expect("failed to get size");
799 assert_eq!(size, 0, "section {section} should be gone after restart");
800 }
801
802 let item1 = journal.get(1, 0).await.expect("failed to get");
804 assert_eq!(item1, test_digest(1));
805 let item2 = journal.get(2, 0).await.expect("failed to get");
806 assert_eq!(item2, test_digest(2));
807
808 journal.destroy().await.expect("failed to destroy");
809 });
810 }
811
812 #[test_traced]
813 fn test_segmented_fixed_corruption_recovery() {
814 let executor = deterministic::Runner::default();
815 executor.start(|context| async move {
816 let cfg = test_cfg();
817 let mut journal = Journal::init(context.clone(), cfg.clone())
818 .await
819 .expect("failed to init");
820
821 for i in 0u64..5 {
822 journal
823 .append(1, test_digest(i))
824 .await
825 .expect("failed to append");
826 }
827 journal.sync_all().await.expect("failed to sync");
828 drop(journal);
829
830 let (blob, size) = context
831 .open(&cfg.partition, &1u64.to_be_bytes())
832 .await
833 .expect("failed to open blob");
834 blob.resize(size - 1).await.expect("failed to truncate");
835 blob.sync().await.expect("failed to sync");
836
837 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
838 .await
839 .expect("failed to re-init");
840
841 let count = {
842 let stream = journal
843 .replay(0, 0, NZUsize!(1024))
844 .await
845 .expect("failed to replay");
846 pin_mut!(stream);
847
848 let mut count = 0;
849 while let Some(result) = stream.next().await {
850 result.expect("should be ok");
851 count += 1;
852 }
853 count
854 };
855 assert_eq!(count, 4);
856
857 journal.destroy().await.expect("failed to destroy");
858 });
859 }
860
861 #[test_traced]
862 fn test_segmented_fixed_persistence() {
863 let executor = deterministic::Runner::default();
864 executor.start(|context| async move {
865 let cfg = test_cfg();
866
867 let mut journal = Journal::init(context.clone(), cfg.clone())
869 .await
870 .expect("failed to init");
871
872 for i in 0u64..5 {
873 journal
874 .append(1, test_digest(i))
875 .await
876 .expect("failed to append");
877 }
878 journal.sync_all().await.expect("failed to sync");
879 drop(journal);
880
881 let journal = Journal::<_, Digest>::init(context.clone(), cfg)
883 .await
884 .expect("failed to re-init");
885
886 for i in 0u64..5 {
887 let item = journal.get(1, i).await.expect("failed to get");
888 assert_eq!(item, test_digest(i));
889 }
890
891 journal.destroy().await.expect("failed to destroy");
892 });
893 }
894
895 #[test_traced]
896 fn test_segmented_fixed_section_len() {
897 let executor = deterministic::Runner::default();
898 executor.start(|context| async move {
899 let cfg = test_cfg();
900 let mut journal = Journal::init(context.clone(), cfg.clone())
901 .await
902 .expect("failed to init");
903
904 assert_eq!(journal.section_len(1).await.unwrap(), 0);
905
906 for i in 0u64..5 {
907 journal
908 .append(1, test_digest(i))
909 .await
910 .expect("failed to append");
911 }
912
913 assert_eq!(journal.section_len(1).await.unwrap(), 5);
914 assert_eq!(journal.section_len(2).await.unwrap(), 0);
915
916 journal.destroy().await.expect("failed to destroy");
917 });
918 }
919
920 #[test_traced]
921 fn test_segmented_fixed_non_contiguous_sections() {
922 let executor = deterministic::Runner::default();
925 executor.start(|context| async move {
926 let cfg = test_cfg();
927 let mut journal = Journal::init(context.clone(), cfg.clone())
928 .await
929 .expect("failed to init");
930
931 journal
933 .append(1, test_digest(100))
934 .await
935 .expect("failed to append");
936 journal
937 .append(5, test_digest(500))
938 .await
939 .expect("failed to append");
940 journal
941 .append(10, test_digest(1000))
942 .await
943 .expect("failed to append");
944 journal.sync_all().await.expect("failed to sync");
945
946 assert_eq!(journal.get(1, 0).await.unwrap(), test_digest(100));
948 assert_eq!(journal.get(5, 0).await.unwrap(), test_digest(500));
949 assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(1000));
950
951 for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
953 let result = journal.get(missing_section, 0).await;
954 assert!(
955 matches!(result, Err(Error::SectionOutOfRange(_))),
956 "Expected SectionOutOfRange for section {}, got {:?}",
957 missing_section,
958 result
959 );
960 }
961
962 drop(journal);
964 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
965 .await
966 .expect("failed to re-init");
967
968 {
970 let stream = journal
971 .replay(0, 0, NZUsize!(1024))
972 .await
973 .expect("failed to replay");
974 pin_mut!(stream);
975
976 let mut items = Vec::new();
977 while let Some(result) = stream.next().await {
978 let (section, _, item) = result.expect("replay error");
979 items.push((section, item));
980 }
981
982 assert_eq!(items.len(), 3, "Should have 3 items");
983 assert_eq!(items[0], (1, test_digest(100)));
984 assert_eq!(items[1], (5, test_digest(500)));
985 assert_eq!(items[2], (10, test_digest(1000)));
986 }
987
988 {
990 let stream = journal
991 .replay(5, 0, NZUsize!(1024))
992 .await
993 .expect("failed to replay from section 5");
994 pin_mut!(stream);
995
996 let mut items = Vec::new();
997 while let Some(result) = stream.next().await {
998 let (section, _, item) = result.expect("replay error");
999 items.push((section, item));
1000 }
1001
1002 assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
1003 assert_eq!(items[0], (5, test_digest(500)));
1004 assert_eq!(items[1], (10, test_digest(1000)));
1005 }
1006
1007 journal.destroy().await.expect("failed to destroy");
1008 });
1009 }
1010
1011 #[test_traced]
1012 fn test_segmented_fixed_empty_section_in_middle() {
1013 let executor = deterministic::Runner::default();
1016 executor.start(|context| async move {
1017 let cfg = test_cfg();
1018 let mut journal = Journal::init(context.clone(), cfg.clone())
1019 .await
1020 .expect("failed to init");
1021
1022 journal
1024 .append(1, test_digest(100))
1025 .await
1026 .expect("failed to append");
1027
1028 journal
1030 .append(2, test_digest(200))
1031 .await
1032 .expect("failed to append");
1033 journal.sync(2).await.expect("failed to sync");
1034 journal
1035 .rewind_section(2, 0)
1036 .await
1037 .expect("failed to rewind");
1038
1039 journal
1041 .append(3, test_digest(300))
1042 .await
1043 .expect("failed to append");
1044
1045 journal.sync_all().await.expect("failed to sync");
1046
1047 assert_eq!(journal.section_len(1).await.unwrap(), 1);
1049 assert_eq!(journal.section_len(2).await.unwrap(), 0);
1050 assert_eq!(journal.section_len(3).await.unwrap(), 1);
1051
1052 drop(journal);
1054 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1055 .await
1056 .expect("failed to re-init");
1057
1058 {
1060 let stream = journal
1061 .replay(0, 0, NZUsize!(1024))
1062 .await
1063 .expect("failed to replay");
1064 pin_mut!(stream);
1065
1066 let mut items = Vec::new();
1067 while let Some(result) = stream.next().await {
1068 let (section, _, item) = result.expect("replay error");
1069 items.push((section, item));
1070 }
1071
1072 assert_eq!(
1073 items.len(),
1074 2,
1075 "Should have 2 items (skipping empty section)"
1076 );
1077 assert_eq!(items[0], (1, test_digest(100)));
1078 assert_eq!(items[1], (3, test_digest(300)));
1079 }
1080
1081 {
1083 let stream = journal
1084 .replay(2, 0, NZUsize!(1024))
1085 .await
1086 .expect("failed to replay from section 2");
1087 pin_mut!(stream);
1088
1089 let mut items = Vec::new();
1090 while let Some(result) = stream.next().await {
1091 let (section, _, item) = result.expect("replay error");
1092 items.push((section, item));
1093 }
1094
1095 assert_eq!(items.len(), 1, "Should have 1 item from section 3");
1096 assert_eq!(items[0], (3, test_digest(300)));
1097 }
1098
1099 journal.destroy().await.expect("failed to destroy");
1100 });
1101 }
1102
1103 #[test_traced]
1104 fn test_segmented_fixed_truncation_recovery_across_page_boundary() {
1105 let executor = deterministic::Runner::default();
1117 executor.start(|context| async move {
1118 let cfg = test_cfg();
1119 let mut journal = Journal::init(context.clone(), cfg.clone())
1120 .await
1121 .expect("failed to init");
1122
1123 for i in 0u64..3 {
1125 journal
1126 .append(1, test_digest(i))
1127 .await
1128 .expect("failed to append");
1129 }
1130 journal.sync_all().await.expect("failed to sync");
1131
1132 for i in 0u64..3 {
1134 let item = journal.get(1, i).await.expect("failed to get");
1135 assert_eq!(item, test_digest(i));
1136 }
1137 drop(journal);
1138
1139 let (blob, size) = context
1141 .open(&cfg.partition, &1u64.to_be_bytes())
1142 .await
1143 .expect("failed to open blob");
1144 blob.resize(size - 1).await.expect("failed to truncate");
1145 blob.sync().await.expect("failed to sync");
1146 drop(blob);
1147
1148 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1151 .await
1152 .expect("failed to re-init");
1153
1154 assert_eq!(journal.section_len(1).await.unwrap(), 2);
1156
1157 assert_eq!(journal.size(1).await.unwrap(), 64);
1160
1161 let item0 = journal.get(1, 0).await.expect("failed to get item 0");
1163 assert_eq!(item0, test_digest(0));
1164 let item1 = journal.get(1, 1).await.expect("failed to get item 1");
1165 assert_eq!(item1, test_digest(1));
1166
1167 let err = journal.get(1, 2).await;
1169 assert!(
1170 matches!(err, Err(Error::ItemOutOfRange(2))),
1171 "expected ItemOutOfRange(2), got {:?}",
1172 err
1173 );
1174
1175 journal.destroy().await.expect("failed to destroy");
1176 });
1177 }
1178
1179 #[test_traced]
1180 fn test_segmented_fixed_init_section_at_size() {
1181 let executor = deterministic::Runner::default();
1183 executor.start(|context| async move {
1184 let cfg = test_cfg();
1185 let mut journal = Journal::init(context.clone(), cfg.clone())
1186 .await
1187 .expect("failed to init");
1188
1189 journal
1191 .init_section_at_size(1, 5)
1192 .await
1193 .expect("failed to init section at size");
1194
1195 assert_eq!(journal.section_len(1).await.unwrap(), 5);
1197
1198 assert_eq!(journal.size(1).await.unwrap(), 5 * 32);
1200
1201 let zero_digest = Sha256::fill(0);
1203 for i in 0u64..5 {
1204 let item = journal.get(1, i).await.expect("failed to get");
1205 assert_eq!(item, zero_digest, "item {i} should be zero-filled");
1206 }
1207
1208 let err = journal.get(1, 5).await;
1210 assert!(matches!(err, Err(Error::ItemOutOfRange(5))));
1211
1212 let pos = journal
1214 .append(1, test_digest(100))
1215 .await
1216 .expect("failed to append");
1217 assert_eq!(pos, 5, "append should return position 5");
1218
1219 assert_eq!(journal.section_len(1).await.unwrap(), 6);
1221
1222 let item = journal.get(1, 5).await.expect("failed to get");
1224 assert_eq!(item, test_digest(100));
1225
1226 journal.sync_all().await.expect("failed to sync");
1227 drop(journal);
1228
1229 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1231 .await
1232 .expect("failed to re-init");
1233
1234 assert_eq!(journal.section_len(1).await.unwrap(), 6);
1235
1236 for i in 0u64..5 {
1238 let item = journal.get(1, i).await.expect("failed to get");
1239 assert_eq!(
1240 item, zero_digest,
1241 "item {i} should still be zero-filled after restart"
1242 );
1243 }
1244
1245 let item = journal.get(1, 5).await.expect("failed to get");
1247 assert_eq!(item, test_digest(100));
1248
1249 {
1251 let stream = journal
1252 .replay(1, 0, NZUsize!(1024))
1253 .await
1254 .expect("failed to replay");
1255 pin_mut!(stream);
1256
1257 let mut items = Vec::new();
1258 while let Some(result) = stream.next().await {
1259 let (section, pos, item) = result.expect("replay error");
1260 items.push((section, pos, item));
1261 }
1262
1263 assert_eq!(items.len(), 6);
1264 for (i, item) in items.iter().enumerate().take(5) {
1265 assert_eq!(*item, (1, i as u64, zero_digest));
1266 }
1267 assert_eq!(items[5], (1, 5, test_digest(100)));
1268 }
1269
1270 {
1272 let stream = journal
1273 .replay(1, 3, NZUsize!(1024))
1274 .await
1275 .expect("failed to replay");
1276 pin_mut!(stream);
1277
1278 let mut items = Vec::new();
1279 while let Some(result) = stream.next().await {
1280 let (section, pos, item) = result.expect("replay error");
1281 items.push((section, pos, item));
1282 }
1283
1284 assert_eq!(items.len(), 3);
1285 assert_eq!(items[0], (1, 3, zero_digest));
1286 assert_eq!(items[1], (1, 4, zero_digest));
1287 assert_eq!(items[2], (1, 5, test_digest(100)));
1288 }
1289
1290 journal.destroy().await.expect("failed to destroy");
1291 });
1292 }
1293}