1use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
24use crate::journal::Error;
25use commonware_codec::{CodecFixed, CodecFixedShared, DecodeExt as _, ReadExt as _};
26use commonware_runtime::{
27 buffer::paged::{CacheRef, Replay},
28 Blob, Buf, IoBufMut, Metrics, Storage,
29};
30use futures::{
31 stream::{self, Stream},
32 StreamExt,
33};
34use std::{marker::PhantomData, num::NonZeroUsize};
35use tracing::{trace, warn};
36
37struct ReplayState<B: Blob> {
39 section: u64,
40 replay: Replay<B>,
41 position: u64,
42 done: bool,
43}
44
45#[derive(Clone)]
47pub struct Config {
48 pub partition: String,
50
51 pub page_cache: CacheRef,
53
54 pub write_buffer: NonZeroUsize,
56}
57
58pub struct Journal<E: Storage + Metrics, A: CodecFixed> {
72 manager: Manager<E, AppendFactory>,
73 _array: PhantomData<A>,
74}
75
76impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
77 pub const CHUNK_SIZE: usize = A::SIZE;
79 const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
80
81 pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
86 let manager_cfg = ManagerConfig {
87 partition: cfg.partition,
88 factory: AppendFactory {
89 write_buffer: cfg.write_buffer,
90 page_cache_ref: cfg.page_cache,
91 },
92 };
93 let mut manager = Manager::init(context, manager_cfg).await?;
94
95 let sections: Vec<_> = manager.sections().collect();
97 for section in sections {
98 let size = manager.size(section).await?;
99 if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
100 let valid_size = size - (size % Self::CHUNK_SIZE_U64);
101 warn!(
102 section,
103 invalid_size = size,
104 new_size = valid_size,
105 "trailing bytes detected: truncating"
106 );
107 manager.rewind_section(section, valid_size).await?;
108 }
109 }
110
111 Ok(Self {
112 manager,
113 _array: PhantomData,
114 })
115 }
116
117 pub async fn append(&mut self, section: u64, item: A) -> Result<u64, Error> {
121 let blob = self.manager.get_or_create(section).await?;
122
123 let size = blob.size().await;
124 if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
125 return Err(Error::InvalidBlobSize(section, size));
126 }
127 let position = size / Self::CHUNK_SIZE_U64;
128
129 let buf = item.encode_mut();
131 blob.append(&buf).await?;
132 trace!(section, position, "appended item");
133
134 Ok(position)
135 }
136
137 pub async fn get(&self, section: u64, position: u64) -> Result<A, Error> {
145 let blob = self
146 .manager
147 .get(section)?
148 .ok_or(Error::SectionOutOfRange(section))?;
149
150 let offset = position
151 .checked_mul(Self::CHUNK_SIZE_U64)
152 .ok_or(Error::ItemOutOfRange(position))?;
153 let end = offset
154 .checked_add(Self::CHUNK_SIZE_U64)
155 .ok_or(Error::ItemOutOfRange(position))?;
156 if end > blob.size().await {
157 return Err(Error::ItemOutOfRange(position));
158 }
159
160 let buf = blob
161 .read_at(offset, IoBufMut::zeroed(Self::CHUNK_SIZE))
162 .await?;
163 A::decode(buf.coalesce()).map_err(Error::Codec)
164 }
165
166 pub async fn last(&self, section: u64) -> Result<Option<A>, Error> {
168 let blob = self
169 .manager
170 .get(section)?
171 .ok_or(Error::SectionOutOfRange(section))?;
172
173 let size = blob.size().await;
174 if size < Self::CHUNK_SIZE_U64 {
175 return Ok(None);
176 }
177
178 let last_position = (size / Self::CHUNK_SIZE_U64) - 1;
179 let offset = last_position * Self::CHUNK_SIZE_U64;
180 let buf = blob
181 .read_at(offset, IoBufMut::zeroed(Self::CHUNK_SIZE))
182 .await?;
183 A::decode(buf.coalesce()).map_err(Error::Codec).map(Some)
184 }
185
186 pub async fn replay(
190 &self,
191 start_section: u64,
192 start_position: u64,
193 buffer: NonZeroUsize,
194 ) -> Result<impl Stream<Item = Result<(u64, u64, A), Error>> + Send + '_, Error> {
195 let mut blob_info = Vec::new();
197 for (§ion, blob) in self.manager.sections_from(start_section) {
198 let blob_size = blob.size().await;
199 let mut replay = blob.replay(buffer).await?;
200 let initial_position = if section == start_section {
202 let start = start_position * Self::CHUNK_SIZE_U64;
203 if start > blob_size {
204 return Err(Error::ItemOutOfRange(start_position));
205 }
206 replay.seek_to(start).await?;
207 start_position
208 } else {
209 0
210 };
211 blob_info.push((section, replay, initial_position));
212 }
213
214 Ok(
218 stream::iter(blob_info).flat_map(move |(section, replay, initial_position)| {
219 stream::unfold(
220 ReplayState {
221 section,
222 replay,
223 position: initial_position,
224 done: false,
225 },
226 move |mut state| async move {
227 if state.done {
228 return None;
229 }
230
231 let mut batch: Vec<Result<(u64, u64, A), Error>> = Vec::new();
232 loop {
233 match state.replay.ensure(Self::CHUNK_SIZE).await {
235 Ok(true) => {}
236 Ok(false) => {
237 state.done = true;
239 return if batch.is_empty() {
240 None
241 } else {
242 Some((batch, state))
243 };
244 }
245 Err(err) => {
246 batch.push(Err(Error::Runtime(err)));
247 state.done = true;
248 return Some((batch, state));
249 }
250 }
251
252 while state.replay.remaining() >= Self::CHUNK_SIZE {
254 match A::read(&mut state.replay) {
255 Ok(item) => {
256 batch.push(Ok((state.section, state.position, item)));
257 state.position += 1;
258 }
259 Err(err) => {
260 batch.push(Err(Error::Codec(err)));
261 state.done = true;
262 return Some((batch, state));
263 }
264 }
265 }
266
267 if !batch.is_empty() {
269 return Some((batch, state));
270 }
271 }
272 },
273 )
274 .flat_map(stream::iter)
275 }),
276 )
277 }
278
279 pub async fn sync(&self, section: u64) -> Result<(), Error> {
281 self.manager.sync(section).await
282 }
283
284 pub async fn sync_all(&self) -> Result<(), Error> {
286 self.manager.sync_all().await
287 }
288
289 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
291 self.manager.prune(min).await
292 }
293
294 pub fn oldest_section(&self) -> Option<u64> {
296 self.manager.oldest_section()
297 }
298
299 pub fn newest_section(&self) -> Option<u64> {
301 self.manager.newest_section()
302 }
303
304 pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
306 self.manager.sections_from(0).map(|(section, _)| *section)
307 }
308
309 pub async fn section_len(&self, section: u64) -> Result<u64, Error> {
311 let size = self.manager.size(section).await?;
312 Ok(size / Self::CHUNK_SIZE_U64)
313 }
314
315 pub async fn size(&self, section: u64) -> Result<u64, Error> {
317 self.manager.size(section).await
318 }
319
320 pub async fn rewind(&mut self, section: u64, offset: u64) -> Result<(), Error> {
325 self.manager.rewind(section, offset).await
326 }
327
328 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
332 self.manager.rewind_section(section, size).await
333 }
334
335 pub async fn destroy(self) -> Result<(), Error> {
337 self.manager.destroy().await
338 }
339
340 pub async fn clear(&mut self) -> Result<(), Error> {
344 self.manager.clear().await
345 }
346
347 pub(crate) async fn ensure_section_exists(&mut self, section: u64) -> Result<(), Error> {
352 self.manager.get_or_create(section).await?;
353 Ok(())
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
361 use commonware_macros::test_traced;
362 use commonware_runtime::{buffer::paged::CacheRef, deterministic, Metrics, Runner};
363 use commonware_utils::{NZUsize, NZU16};
364 use core::num::NonZeroU16;
365 use futures::{pin_mut, StreamExt};
366
367 const PAGE_SIZE: NonZeroU16 = NZU16!(44);
368 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
369
370 fn test_digest(value: u64) -> Digest {
371 Sha256::hash(&value.to_be_bytes())
372 }
373
374 fn test_cfg() -> Config {
375 Config {
376 partition: "test_partition".into(),
377 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
378 write_buffer: NZUsize!(2048),
379 }
380 }
381
382 #[test_traced]
383 fn test_segmented_fixed_append_and_get() {
384 let executor = deterministic::Runner::default();
385 executor.start(|context| async move {
386 let cfg = test_cfg();
387 let mut journal = Journal::init(context.clone(), cfg.clone())
388 .await
389 .expect("failed to init");
390
391 let pos0 = journal
392 .append(1, test_digest(0))
393 .await
394 .expect("failed to append");
395 assert_eq!(pos0, 0);
396
397 let pos1 = journal
398 .append(1, test_digest(1))
399 .await
400 .expect("failed to append");
401 assert_eq!(pos1, 1);
402
403 let pos2 = journal
404 .append(2, test_digest(2))
405 .await
406 .expect("failed to append");
407 assert_eq!(pos2, 0);
408
409 let item0 = journal.get(1, 0).await.expect("failed to get");
410 assert_eq!(item0, test_digest(0));
411
412 let item1 = journal.get(1, 1).await.expect("failed to get");
413 assert_eq!(item1, test_digest(1));
414
415 let item2 = journal.get(2, 0).await.expect("failed to get");
416 assert_eq!(item2, test_digest(2));
417
418 let err = journal.get(1, 2).await;
419 assert!(matches!(err, Err(Error::ItemOutOfRange(2))));
420
421 let err = journal.get(3, 0).await;
422 assert!(matches!(err, Err(Error::SectionOutOfRange(3))));
423
424 journal.destroy().await.expect("failed to destroy");
425 });
426 }
427
428 #[test_traced]
429 fn test_segmented_fixed_replay() {
430 let executor = deterministic::Runner::default();
431 executor.start(|context| async move {
432 let cfg = test_cfg();
433 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
434 .await
435 .expect("failed to init");
436
437 for i in 0u64..10 {
438 journal
439 .append(1, test_digest(i))
440 .await
441 .expect("failed to append");
442 }
443 for i in 10u64..20 {
444 journal
445 .append(2, test_digest(i))
446 .await
447 .expect("failed to append");
448 }
449
450 journal.sync_all().await.expect("failed to sync");
451 drop(journal);
452
453 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
454 .await
455 .expect("failed to re-init");
456
457 let items = {
458 let stream = journal
459 .replay(0, 0, NZUsize!(1024))
460 .await
461 .expect("failed to replay");
462 pin_mut!(stream);
463
464 let mut items = Vec::new();
465 while let Some(result) = stream.next().await {
466 match result {
467 Ok((section, pos, item)) => items.push((section, pos, item)),
468 Err(err) => panic!("replay error: {err}"),
469 }
470 }
471 items
472 };
473
474 assert_eq!(items.len(), 20);
475 for (i, item) in items.iter().enumerate().take(10) {
476 assert_eq!(item.0, 1);
477 assert_eq!(item.1, i as u64);
478 assert_eq!(item.2, test_digest(i as u64));
479 }
480 for (i, item) in items.iter().enumerate().skip(10).take(10) {
481 assert_eq!(item.0, 2);
482 assert_eq!(item.1, (i - 10) as u64);
483 assert_eq!(item.2, test_digest(i as u64));
484 }
485
486 journal.destroy().await.expect("failed to destroy");
487 });
488 }
489
490 #[test_traced]
491 fn test_segmented_fixed_replay_with_start_offset() {
492 let executor = deterministic::Runner::default();
494 executor.start(|context| async move {
495 let cfg = test_cfg();
496 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
497 .await
498 .expect("failed to init");
499
500 for i in 0u64..10 {
502 journal
503 .append(1, test_digest(i))
504 .await
505 .expect("failed to append");
506 }
507 for i in 10u64..15 {
509 journal
510 .append(2, test_digest(i))
511 .await
512 .expect("failed to append");
513 }
514 journal.sync_all().await.expect("failed to sync");
515 drop(journal);
516
517 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
518 .await
519 .expect("failed to re-init");
520
521 {
523 let stream = journal
524 .replay(1, 5, NZUsize!(1024))
525 .await
526 .expect("failed to replay");
527 pin_mut!(stream);
528
529 let mut items = Vec::new();
530 while let Some(result) = stream.next().await {
531 let (section, pos, item) = result.expect("replay error");
532 items.push((section, pos, item));
533 }
534
535 assert_eq!(
536 items.len(),
537 10,
538 "Should have 5 items from section 1 + 5 from section 2"
539 );
540
541 for (i, (section, pos, item)) in items.iter().enumerate().take(5) {
543 assert_eq!(*section, 1);
544 assert_eq!(*pos, (i + 5) as u64);
545 assert_eq!(*item, test_digest((i + 5) as u64));
546 }
547
548 for (i, (section, pos, item)) in items.iter().enumerate().skip(5) {
550 assert_eq!(*section, 2);
551 assert_eq!(*pos, (i - 5) as u64);
552 assert_eq!(*item, test_digest((i + 5) as u64));
553 }
554 }
555
556 {
558 let stream = journal
559 .replay(1, 9, NZUsize!(1024))
560 .await
561 .expect("failed to replay");
562 pin_mut!(stream);
563
564 let mut items = Vec::new();
565 while let Some(result) = stream.next().await {
566 let (section, pos, item) = result.expect("replay error");
567 items.push((section, pos, item));
568 }
569
570 assert_eq!(
571 items.len(),
572 6,
573 "Should have 1 item from section 1 + 5 from section 2"
574 );
575 assert_eq!(items[0], (1, 9, test_digest(9)));
576 for (i, (section, pos, item)) in items.iter().enumerate().skip(1) {
577 assert_eq!(*section, 2);
578 assert_eq!(*pos, (i - 1) as u64);
579 assert_eq!(*item, test_digest((i + 9) as u64));
580 }
581 }
582
583 {
585 let stream = journal
586 .replay(2, 3, NZUsize!(1024))
587 .await
588 .expect("failed to replay");
589 pin_mut!(stream);
590
591 let mut items = Vec::new();
592 while let Some(result) = stream.next().await {
593 let (section, pos, item) = result.expect("replay error");
594 items.push((section, pos, item));
595 }
596
597 assert_eq!(items.len(), 2, "Should have 2 items from section 2");
598 assert_eq!(items[0], (2, 3, test_digest(13)));
599 assert_eq!(items[1], (2, 4, test_digest(14)));
600 }
601
602 let result = journal.replay(1, 100, NZUsize!(1024)).await;
604 assert!(matches!(result, Err(Error::ItemOutOfRange(100))));
605 drop(result);
606
607 journal.destroy().await.expect("failed to destroy");
608 });
609 }
610
611 #[test_traced]
612 fn test_segmented_fixed_prune() {
613 let executor = deterministic::Runner::default();
614 executor.start(|context| async move {
615 let cfg = test_cfg();
616 let mut journal = Journal::init(context.clone(), cfg.clone())
617 .await
618 .expect("failed to init");
619
620 for section in 1u64..=5 {
621 journal
622 .append(section, test_digest(section))
623 .await
624 .expect("failed to append");
625 }
626 journal.sync_all().await.expect("failed to sync");
627
628 journal.prune(3).await.expect("failed to prune");
629
630 let err = journal.get(1, 0).await;
631 assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
632
633 let err = journal.get(2, 0).await;
634 assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
635
636 let item = journal.get(3, 0).await.expect("should exist");
637 assert_eq!(item, test_digest(3));
638
639 journal.destroy().await.expect("failed to destroy");
640 });
641 }
642
643 #[test_traced]
644 fn test_segmented_fixed_rewind() {
645 let executor = deterministic::Runner::default();
646 executor.start(|context| async move {
647 let cfg = test_cfg();
648 let mut journal = Journal::init(context.clone(), cfg.clone())
649 .await
650 .expect("failed to init");
651
652 for section in 1u64..=3 {
654 journal
655 .append(section, test_digest(section))
656 .await
657 .expect("failed to append");
658 }
659 journal.sync_all().await.expect("failed to sync");
660
661 for section in 1u64..=3 {
663 let size = journal.size(section).await.expect("failed to get size");
664 assert!(size > 0, "section {section} should have data");
665 }
666
667 let size = journal.size(1).await.expect("failed to get size");
669 journal.rewind(1, size).await.expect("failed to rewind");
670
671 let size = journal.size(1).await.expect("failed to get size");
673 assert!(size > 0, "section 1 should still have data");
674
675 for section in 2u64..=3 {
677 let size = journal.size(section).await.expect("failed to get size");
678 assert_eq!(size, 0, "section {section} should be removed");
679 }
680
681 let item = journal.get(1, 0).await.expect("failed to get");
683 assert_eq!(item, test_digest(1));
684
685 journal.destroy().await.expect("failed to destroy");
686 });
687 }
688
689 #[test_traced]
690 fn test_segmented_fixed_rewind_many_sections() {
691 let executor = deterministic::Runner::default();
692 executor.start(|context| async move {
693 let cfg = test_cfg();
694 let mut journal = Journal::init(context.clone(), cfg.clone())
695 .await
696 .expect("failed to init");
697
698 for section in 1u64..=10 {
700 journal
701 .append(section, test_digest(section))
702 .await
703 .expect("failed to append");
704 }
705 journal.sync_all().await.expect("failed to sync");
706
707 let size = journal.size(5).await.expect("failed to get size");
709 journal.rewind(5, size).await.expect("failed to rewind");
710
711 for section in 1u64..=5 {
713 let size = journal.size(section).await.expect("failed to get size");
714 assert!(size > 0, "section {section} should still have data");
715 }
716
717 for section in 6u64..=10 {
719 let size = journal.size(section).await.expect("failed to get size");
720 assert_eq!(size, 0, "section {section} should be removed");
721 }
722
723 {
725 let stream = journal
726 .replay(0, 0, NZUsize!(1024))
727 .await
728 .expect("failed to replay");
729 pin_mut!(stream);
730 let mut items = Vec::new();
731 while let Some(result) = stream.next().await {
732 let (section, _, item) = result.expect("failed to read");
733 items.push((section, item));
734 }
735 assert_eq!(items.len(), 5);
736 for (i, (section, item)) in items.iter().enumerate() {
737 assert_eq!(*section, (i + 1) as u64);
738 assert_eq!(*item, test_digest((i + 1) as u64));
739 }
740 }
741
742 journal.destroy().await.expect("failed to destroy");
743 });
744 }
745
746 #[test_traced]
747 fn test_segmented_fixed_rewind_persistence() {
748 let executor = deterministic::Runner::default();
749 executor.start(|context| async move {
750 let cfg = test_cfg();
751
752 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
754 .await
755 .expect("failed to init");
756 for section in 1u64..=5 {
757 journal
758 .append(section, test_digest(section))
759 .await
760 .expect("failed to append");
761 }
762 journal.sync_all().await.expect("failed to sync");
763
764 let size = journal.size(2).await.expect("failed to get size");
766 journal.rewind(2, size).await.expect("failed to rewind");
767 journal.sync_all().await.expect("failed to sync");
768 drop(journal);
769
770 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
772 .await
773 .expect("failed to re-init");
774
775 for section in 1u64..=2 {
777 let size = journal.size(section).await.expect("failed to get size");
778 assert!(size > 0, "section {section} should have data after restart");
779 }
780
781 for section in 3u64..=5 {
783 let size = journal.size(section).await.expect("failed to get size");
784 assert_eq!(size, 0, "section {section} should be gone after restart");
785 }
786
787 let item1 = journal.get(1, 0).await.expect("failed to get");
789 assert_eq!(item1, test_digest(1));
790 let item2 = journal.get(2, 0).await.expect("failed to get");
791 assert_eq!(item2, test_digest(2));
792
793 journal.destroy().await.expect("failed to destroy");
794 });
795 }
796
797 #[test_traced]
798 fn test_segmented_fixed_corruption_recovery() {
799 let executor = deterministic::Runner::default();
800 executor.start(|context| async move {
801 let cfg = test_cfg();
802 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
803 .await
804 .expect("failed to init");
805
806 for i in 0u64..5 {
807 journal
808 .append(1, test_digest(i))
809 .await
810 .expect("failed to append");
811 }
812 journal.sync_all().await.expect("failed to sync");
813 drop(journal);
814
815 let (blob, size) = context
816 .open(&cfg.partition, &1u64.to_be_bytes())
817 .await
818 .expect("failed to open blob");
819 blob.resize(size - 1).await.expect("failed to truncate");
820 blob.sync().await.expect("failed to sync");
821
822 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
823 .await
824 .expect("failed to re-init");
825
826 let count = {
827 let stream = journal
828 .replay(0, 0, NZUsize!(1024))
829 .await
830 .expect("failed to replay");
831 pin_mut!(stream);
832
833 let mut count = 0;
834 while let Some(result) = stream.next().await {
835 result.expect("should be ok");
836 count += 1;
837 }
838 count
839 };
840 assert_eq!(count, 4);
841
842 journal.destroy().await.expect("failed to destroy");
843 });
844 }
845
846 #[test_traced]
847 fn test_segmented_fixed_persistence() {
848 let executor = deterministic::Runner::default();
849 executor.start(|context| async move {
850 let cfg = test_cfg();
851
852 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
854 .await
855 .expect("failed to init");
856
857 for i in 0u64..5 {
858 journal
859 .append(1, test_digest(i))
860 .await
861 .expect("failed to append");
862 }
863 journal.sync_all().await.expect("failed to sync");
864 drop(journal);
865
866 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg)
868 .await
869 .expect("failed to re-init");
870
871 for i in 0u64..5 {
872 let item = journal.get(1, i).await.expect("failed to get");
873 assert_eq!(item, test_digest(i));
874 }
875
876 journal.destroy().await.expect("failed to destroy");
877 });
878 }
879
880 #[test_traced]
881 fn test_segmented_fixed_section_len() {
882 let executor = deterministic::Runner::default();
883 executor.start(|context| async move {
884 let cfg = test_cfg();
885 let mut journal = Journal::init(context.clone(), cfg.clone())
886 .await
887 .expect("failed to init");
888
889 assert_eq!(journal.section_len(1).await.unwrap(), 0);
890
891 for i in 0u64..5 {
892 journal
893 .append(1, test_digest(i))
894 .await
895 .expect("failed to append");
896 }
897
898 assert_eq!(journal.section_len(1).await.unwrap(), 5);
899 assert_eq!(journal.section_len(2).await.unwrap(), 0);
900
901 journal.destroy().await.expect("failed to destroy");
902 });
903 }
904
905 #[test_traced]
906 fn test_segmented_fixed_non_contiguous_sections() {
907 let executor = deterministic::Runner::default();
910 executor.start(|context| async move {
911 let cfg = test_cfg();
912 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
913 .await
914 .expect("failed to init");
915
916 journal
918 .append(1, test_digest(100))
919 .await
920 .expect("failed to append");
921 journal
922 .append(5, test_digest(500))
923 .await
924 .expect("failed to append");
925 journal
926 .append(10, test_digest(1000))
927 .await
928 .expect("failed to append");
929 journal.sync_all().await.expect("failed to sync");
930
931 assert_eq!(journal.get(1, 0).await.unwrap(), test_digest(100));
933 assert_eq!(journal.get(5, 0).await.unwrap(), test_digest(500));
934 assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(1000));
935
936 for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
938 let result = journal.get(missing_section, 0).await;
939 assert!(
940 matches!(result, Err(Error::SectionOutOfRange(_))),
941 "Expected SectionOutOfRange for section {}, got {:?}",
942 missing_section,
943 result
944 );
945 }
946
947 drop(journal);
949 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
950 .await
951 .expect("failed to re-init");
952
953 {
955 let stream = journal
956 .replay(0, 0, NZUsize!(1024))
957 .await
958 .expect("failed to replay");
959 pin_mut!(stream);
960
961 let mut items = Vec::new();
962 while let Some(result) = stream.next().await {
963 let (section, _, item) = result.expect("replay error");
964 items.push((section, item));
965 }
966
967 assert_eq!(items.len(), 3, "Should have 3 items");
968 assert_eq!(items[0], (1, test_digest(100)));
969 assert_eq!(items[1], (5, test_digest(500)));
970 assert_eq!(items[2], (10, test_digest(1000)));
971 }
972
973 {
975 let stream = journal
976 .replay(5, 0, NZUsize!(1024))
977 .await
978 .expect("failed to replay from section 5");
979 pin_mut!(stream);
980
981 let mut items = Vec::new();
982 while let Some(result) = stream.next().await {
983 let (section, _, item) = result.expect("replay error");
984 items.push((section, item));
985 }
986
987 assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
988 assert_eq!(items[0], (5, test_digest(500)));
989 assert_eq!(items[1], (10, test_digest(1000)));
990 }
991
992 journal.destroy().await.expect("failed to destroy");
993 });
994 }
995
996 #[test_traced]
997 fn test_segmented_fixed_empty_section_in_middle() {
998 let executor = deterministic::Runner::default();
1001 executor.start(|context| async move {
1002 let cfg = test_cfg();
1003 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1004 .await
1005 .expect("failed to init");
1006
1007 journal
1009 .append(1, test_digest(100))
1010 .await
1011 .expect("failed to append");
1012
1013 journal
1015 .append(2, test_digest(200))
1016 .await
1017 .expect("failed to append");
1018 journal.sync(2).await.expect("failed to sync");
1019 journal
1020 .rewind_section(2, 0)
1021 .await
1022 .expect("failed to rewind");
1023
1024 journal
1026 .append(3, test_digest(300))
1027 .await
1028 .expect("failed to append");
1029
1030 journal.sync_all().await.expect("failed to sync");
1031
1032 assert_eq!(journal.section_len(1).await.unwrap(), 1);
1034 assert_eq!(journal.section_len(2).await.unwrap(), 0);
1035 assert_eq!(journal.section_len(3).await.unwrap(), 1);
1036
1037 drop(journal);
1039 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1040 .await
1041 .expect("failed to re-init");
1042
1043 {
1045 let stream = journal
1046 .replay(0, 0, NZUsize!(1024))
1047 .await
1048 .expect("failed to replay");
1049 pin_mut!(stream);
1050
1051 let mut items = Vec::new();
1052 while let Some(result) = stream.next().await {
1053 let (section, _, item) = result.expect("replay error");
1054 items.push((section, item));
1055 }
1056
1057 assert_eq!(
1058 items.len(),
1059 2,
1060 "Should have 2 items (skipping empty section)"
1061 );
1062 assert_eq!(items[0], (1, test_digest(100)));
1063 assert_eq!(items[1], (3, test_digest(300)));
1064 }
1065
1066 {
1068 let stream = journal
1069 .replay(2, 0, NZUsize!(1024))
1070 .await
1071 .expect("failed to replay from section 2");
1072 pin_mut!(stream);
1073
1074 let mut items = Vec::new();
1075 while let Some(result) = stream.next().await {
1076 let (section, _, item) = result.expect("replay error");
1077 items.push((section, item));
1078 }
1079
1080 assert_eq!(items.len(), 1, "Should have 1 item from section 3");
1081 assert_eq!(items[0], (3, test_digest(300)));
1082 }
1083
1084 journal.destroy().await.expect("failed to destroy");
1085 });
1086 }
1087
1088 #[test_traced]
1089 fn test_segmented_fixed_truncation_recovery_across_page_boundary() {
1090 let executor = deterministic::Runner::default();
1102 executor.start(|context| async move {
1103 let cfg = test_cfg();
1104 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1105 .await
1106 .expect("failed to init");
1107
1108 for i in 0u64..3 {
1110 journal
1111 .append(1, test_digest(i))
1112 .await
1113 .expect("failed to append");
1114 }
1115 journal.sync_all().await.expect("failed to sync");
1116
1117 for i in 0u64..3 {
1119 let item = journal.get(1, i).await.expect("failed to get");
1120 assert_eq!(item, test_digest(i));
1121 }
1122 drop(journal);
1123
1124 let (blob, size) = context
1126 .open(&cfg.partition, &1u64.to_be_bytes())
1127 .await
1128 .expect("failed to open blob");
1129 blob.resize(size - 1).await.expect("failed to truncate");
1130 blob.sync().await.expect("failed to sync");
1131 drop(blob);
1132
1133 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1136 .await
1137 .expect("failed to re-init");
1138
1139 assert_eq!(journal.section_len(1).await.unwrap(), 2);
1141
1142 assert_eq!(journal.size(1).await.unwrap(), 64);
1145
1146 let item0 = journal.get(1, 0).await.expect("failed to get item 0");
1148 assert_eq!(item0, test_digest(0));
1149 let item1 = journal.get(1, 1).await.expect("failed to get item 1");
1150 assert_eq!(item1, test_digest(1));
1151
1152 let err = journal.get(1, 2).await;
1154 assert!(
1155 matches!(err, Err(Error::ItemOutOfRange(2))),
1156 "expected ItemOutOfRange(2), got {:?}",
1157 err
1158 );
1159
1160 journal.destroy().await.expect("failed to destroy");
1161 });
1162 }
1163
1164 #[test_traced]
1165 fn test_journal_clear() {
1166 let executor = deterministic::Runner::default();
1167 executor.start(|context| async move {
1168 let cfg = Config {
1169 partition: "clear_test".into(),
1170 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1171 write_buffer: NZUsize!(1024),
1172 };
1173
1174 let mut journal: Journal<_, Digest> =
1175 Journal::init(context.with_label("journal"), cfg.clone())
1176 .await
1177 .expect("Failed to initialize journal");
1178
1179 for section in 0..5u64 {
1181 for i in 0..10u64 {
1182 journal
1183 .append(section, test_digest(section * 1000 + i))
1184 .await
1185 .expect("Failed to append");
1186 }
1187 journal.sync(section).await.expect("Failed to sync");
1188 }
1189
1190 assert_eq!(journal.get(0, 0).await.unwrap(), test_digest(0));
1192 assert_eq!(journal.get(4, 0).await.unwrap(), test_digest(4000));
1193
1194 journal.clear().await.expect("Failed to clear");
1196
1197 for section in 0..5u64 {
1199 assert!(matches!(
1200 journal.get(section, 0).await,
1201 Err(Error::SectionOutOfRange(s)) if s == section
1202 ));
1203 }
1204
1205 for i in 0..5u64 {
1207 journal
1208 .append(10, test_digest(i * 100))
1209 .await
1210 .expect("Failed to append after clear");
1211 }
1212 journal.sync(10).await.expect("Failed to sync after clear");
1213
1214 assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(0));
1216
1217 assert!(matches!(
1219 journal.get(0, 0).await,
1220 Err(Error::SectionOutOfRange(0))
1221 ));
1222
1223 journal.destroy().await.unwrap();
1224 });
1225 }
1226}