1use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
24use crate::journal::Error;
25use commonware_codec::{CodecFixed, CodecFixedShared, DecodeExt as _, ReadExt as _};
26use commonware_runtime::{
27 buffer::paged::{CacheRef, Replay},
28 Blob, Buf, Metrics, Storage,
29};
30use futures::{
31 stream::{self, Stream},
32 StreamExt,
33};
34use std::{marker::PhantomData, num::NonZeroUsize};
35use tracing::{trace, warn};
36
37struct ReplayState<B: Blob> {
39 section: u64,
40 replay: Replay<B>,
41 position: u64,
42 done: bool,
43}
44
45#[derive(Clone)]
47pub struct Config {
48 pub partition: String,
50
51 pub page_cache: CacheRef,
53
54 pub write_buffer: NonZeroUsize,
56}
57
58pub struct Journal<E: Storage + Metrics, A: CodecFixed> {
72 manager: Manager<E, AppendFactory>,
73 _array: PhantomData<A>,
74}
75
76impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
77 pub const CHUNK_SIZE: usize = A::SIZE;
79 const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
80
81 pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
86 let manager_cfg = ManagerConfig {
87 partition: cfg.partition,
88 factory: AppendFactory {
89 write_buffer: cfg.write_buffer,
90 page_cache_ref: cfg.page_cache,
91 },
92 };
93 let mut manager = Manager::init(context, manager_cfg).await?;
94
95 let sections: Vec<_> = manager.sections().collect();
97 for section in sections {
98 let size = manager.size(section).await?;
99 if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
100 let valid_size = size - (size % Self::CHUNK_SIZE_U64);
101 warn!(
102 section,
103 invalid_size = size,
104 new_size = valid_size,
105 "trailing bytes detected: truncating"
106 );
107 manager.rewind_section(section, valid_size).await?;
108 }
109 }
110
111 Ok(Self {
112 manager,
113 _array: PhantomData,
114 })
115 }
116
117 pub async fn append(&mut self, section: u64, item: &A) -> Result<u64, Error> {
121 let blob = self.manager.get_or_create(section).await?;
122
123 let size = blob.size().await;
124 if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
125 return Err(Error::InvalidBlobSize(section, size));
126 }
127 let position = size / Self::CHUNK_SIZE_U64;
128
129 let buf = item.encode_mut();
131 blob.append(&buf).await?;
132 trace!(section, position, "appended item");
133
134 Ok(position)
135 }
136
137 pub async fn get(&self, section: u64, position: u64) -> Result<A, Error> {
145 let blob = self
146 .manager
147 .get(section)?
148 .ok_or(Error::SectionOutOfRange(section))?;
149
150 let offset = position
151 .checked_mul(Self::CHUNK_SIZE_U64)
152 .ok_or(Error::ItemOutOfRange(position))?;
153 let end = offset
154 .checked_add(Self::CHUNK_SIZE_U64)
155 .ok_or(Error::ItemOutOfRange(position))?;
156 if end > blob.size().await {
157 return Err(Error::ItemOutOfRange(position));
158 }
159
160 let buf = blob.read_at(offset, Self::CHUNK_SIZE).await?;
161 A::decode(buf.coalesce()).map_err(Error::Codec)
162 }
163
164 pub async fn last(&self, section: u64) -> Result<Option<A>, Error> {
173 let blob = self
174 .manager
175 .get(section)?
176 .ok_or(Error::SectionOutOfRange(section))?;
177
178 let size = blob.size().await;
179 if size < Self::CHUNK_SIZE_U64 {
180 return Ok(None);
181 }
182
183 let last_position = (size / Self::CHUNK_SIZE_U64) - 1;
184 let offset = last_position * Self::CHUNK_SIZE_U64;
185 let buf = blob.read_at(offset, Self::CHUNK_SIZE).await?;
186 A::decode(buf.coalesce()).map_err(Error::Codec).map(Some)
187 }
188
189 pub async fn replay(
193 &self,
194 start_section: u64,
195 start_position: u64,
196 buffer: NonZeroUsize,
197 ) -> Result<impl Stream<Item = Result<(u64, u64, A), Error>> + Send + '_, Error> {
198 let mut blob_info = Vec::new();
200 for (§ion, blob) in self.manager.sections_from(start_section) {
201 let blob_size = blob.size().await;
202 let mut replay = blob.replay(buffer).await?;
203 let initial_position = if section == start_section {
205 let start = start_position * Self::CHUNK_SIZE_U64;
206 if start > blob_size {
207 return Err(Error::ItemOutOfRange(start_position));
208 }
209 replay.seek_to(start)?;
210 start_position
211 } else {
212 0
213 };
214 blob_info.push((section, replay, initial_position));
215 }
216
217 Ok(
221 stream::iter(blob_info).flat_map(move |(section, replay, initial_position)| {
222 stream::unfold(
223 ReplayState {
224 section,
225 replay,
226 position: initial_position,
227 done: false,
228 },
229 move |mut state| async move {
230 if state.done {
231 return None;
232 }
233
234 let mut batch: Vec<Result<(u64, u64, A), Error>> = Vec::new();
235 loop {
236 match state.replay.ensure(Self::CHUNK_SIZE).await {
238 Ok(true) => {}
239 Ok(false) => {
240 state.done = true;
242 return if batch.is_empty() {
243 None
244 } else {
245 Some((batch, state))
246 };
247 }
248 Err(err) => {
249 batch.push(Err(Error::Runtime(err)));
250 state.done = true;
251 return Some((batch, state));
252 }
253 }
254
255 while state.replay.remaining() >= Self::CHUNK_SIZE {
257 match A::read(&mut state.replay) {
258 Ok(item) => {
259 batch.push(Ok((state.section, state.position, item)));
260 state.position += 1;
261 }
262 Err(err) => {
263 batch.push(Err(Error::Codec(err)));
264 state.done = true;
265 return Some((batch, state));
266 }
267 }
268 }
269
270 if !batch.is_empty() {
272 return Some((batch, state));
273 }
274 }
275 },
276 )
277 .flat_map(stream::iter)
278 }),
279 )
280 }
281
282 pub async fn sync(&self, section: u64) -> Result<(), Error> {
284 self.manager.sync(section).await
285 }
286
287 pub async fn sync_all(&self) -> Result<(), Error> {
289 self.manager.sync_all().await
290 }
291
292 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
294 self.manager.prune(min).await
295 }
296
297 pub fn oldest_section(&self) -> Option<u64> {
299 self.manager.oldest_section()
300 }
301
302 pub fn newest_section(&self) -> Option<u64> {
304 self.manager.newest_section()
305 }
306
307 pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
309 self.manager.sections_from(0).map(|(section, _)| *section)
310 }
311
312 pub async fn section_len(&self, section: u64) -> Result<u64, Error> {
314 let size = self.manager.size(section).await?;
315 Ok(size / Self::CHUNK_SIZE_U64)
316 }
317
318 pub async fn size(&self, section: u64) -> Result<u64, Error> {
320 self.manager.size(section).await
321 }
322
323 pub async fn rewind(&mut self, section: u64, offset: u64) -> Result<(), Error> {
328 self.manager.rewind(section, offset).await
329 }
330
331 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
335 self.manager.rewind_section(section, size).await
336 }
337
338 pub async fn destroy(self) -> Result<(), Error> {
340 self.manager.destroy().await
341 }
342
343 pub async fn clear(&mut self) -> Result<(), Error> {
347 self.manager.clear().await
348 }
349
350 pub(crate) async fn ensure_section_exists(&mut self, section: u64) -> Result<(), Error> {
355 self.manager.get_or_create(section).await?;
356 Ok(())
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
364 use commonware_macros::test_traced;
365 use commonware_runtime::{
366 buffer::paged::CacheRef, deterministic, BufferPooler, Metrics, Runner,
367 };
368 use commonware_utils::{NZUsize, NZU16};
369 use core::num::NonZeroU16;
370 use futures::{pin_mut, StreamExt};
371
372 const PAGE_SIZE: NonZeroU16 = NZU16!(44);
373 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
374
375 fn test_digest(value: u64) -> Digest {
376 Sha256::hash(&value.to_be_bytes())
377 }
378
379 fn test_cfg(pooler: &impl BufferPooler) -> Config {
380 Config {
381 partition: "test-partition".into(),
382 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
383 write_buffer: NZUsize!(2048),
384 }
385 }
386
387 #[test_traced]
388 fn test_segmented_fixed_append_and_get() {
389 let executor = deterministic::Runner::default();
390 executor.start(|context| async move {
391 let cfg = test_cfg(&context);
392 let mut journal = Journal::init(context.clone(), cfg.clone())
393 .await
394 .expect("failed to init");
395
396 let pos0 = journal
397 .append(1, &test_digest(0))
398 .await
399 .expect("failed to append");
400 assert_eq!(pos0, 0);
401
402 let pos1 = journal
403 .append(1, &test_digest(1))
404 .await
405 .expect("failed to append");
406 assert_eq!(pos1, 1);
407
408 let pos2 = journal
409 .append(2, &test_digest(2))
410 .await
411 .expect("failed to append");
412 assert_eq!(pos2, 0);
413
414 let item0 = journal.get(1, 0).await.expect("failed to get");
415 assert_eq!(item0, test_digest(0));
416
417 let item1 = journal.get(1, 1).await.expect("failed to get");
418 assert_eq!(item1, test_digest(1));
419
420 let item2 = journal.get(2, 0).await.expect("failed to get");
421 assert_eq!(item2, test_digest(2));
422
423 let err = journal.get(1, 2).await;
424 assert!(matches!(err, Err(Error::ItemOutOfRange(2))));
425
426 let err = journal.get(3, 0).await;
427 assert!(matches!(err, Err(Error::SectionOutOfRange(3))));
428
429 journal.destroy().await.expect("failed to destroy");
430 });
431 }
432
433 #[test_traced]
434 fn test_segmented_fixed_replay() {
435 let executor = deterministic::Runner::default();
436 executor.start(|context| async move {
437 let cfg = test_cfg(&context);
438 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
439 .await
440 .expect("failed to init");
441
442 for i in 0u64..10 {
443 journal
444 .append(1, &test_digest(i))
445 .await
446 .expect("failed to append");
447 }
448 for i in 10u64..20 {
449 journal
450 .append(2, &test_digest(i))
451 .await
452 .expect("failed to append");
453 }
454
455 journal.sync_all().await.expect("failed to sync");
456 drop(journal);
457
458 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
459 .await
460 .expect("failed to re-init");
461
462 let items = {
463 let stream = journal
464 .replay(0, 0, NZUsize!(1024))
465 .await
466 .expect("failed to replay");
467 pin_mut!(stream);
468
469 let mut items = Vec::new();
470 while let Some(result) = stream.next().await {
471 match result {
472 Ok((section, pos, item)) => items.push((section, pos, item)),
473 Err(err) => panic!("replay error: {err}"),
474 }
475 }
476 items
477 };
478
479 assert_eq!(items.len(), 20);
480 for (i, item) in items.iter().enumerate().take(10) {
481 assert_eq!(item.0, 1);
482 assert_eq!(item.1, i as u64);
483 assert_eq!(item.2, test_digest(i as u64));
484 }
485 for (i, item) in items.iter().enumerate().skip(10).take(10) {
486 assert_eq!(item.0, 2);
487 assert_eq!(item.1, (i - 10) as u64);
488 assert_eq!(item.2, test_digest(i as u64));
489 }
490
491 journal.destroy().await.expect("failed to destroy");
492 });
493 }
494
495 #[test_traced]
496 fn test_segmented_fixed_replay_with_start_offset() {
497 let executor = deterministic::Runner::default();
499 executor.start(|context| async move {
500 let cfg = test_cfg(&context);
501 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
502 .await
503 .expect("failed to init");
504
505 for i in 0u64..10 {
507 journal
508 .append(1, &test_digest(i))
509 .await
510 .expect("failed to append");
511 }
512 for i in 10u64..15 {
514 journal
515 .append(2, &test_digest(i))
516 .await
517 .expect("failed to append");
518 }
519 journal.sync_all().await.expect("failed to sync");
520 drop(journal);
521
522 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
523 .await
524 .expect("failed to re-init");
525
526 {
528 let stream = journal
529 .replay(1, 5, NZUsize!(1024))
530 .await
531 .expect("failed to replay");
532 pin_mut!(stream);
533
534 let mut items = Vec::new();
535 while let Some(result) = stream.next().await {
536 let (section, pos, item) = result.expect("replay error");
537 items.push((section, pos, item));
538 }
539
540 assert_eq!(
541 items.len(),
542 10,
543 "Should have 5 items from section 1 + 5 from section 2"
544 );
545
546 for (i, (section, pos, item)) in items.iter().enumerate().take(5) {
548 assert_eq!(*section, 1);
549 assert_eq!(*pos, (i + 5) as u64);
550 assert_eq!(*item, test_digest((i + 5) as u64));
551 }
552
553 for (i, (section, pos, item)) in items.iter().enumerate().skip(5) {
555 assert_eq!(*section, 2);
556 assert_eq!(*pos, (i - 5) as u64);
557 assert_eq!(*item, test_digest((i + 5) as u64));
558 }
559 }
560
561 {
563 let stream = journal
564 .replay(1, 9, NZUsize!(1024))
565 .await
566 .expect("failed to replay");
567 pin_mut!(stream);
568
569 let mut items = Vec::new();
570 while let Some(result) = stream.next().await {
571 let (section, pos, item) = result.expect("replay error");
572 items.push((section, pos, item));
573 }
574
575 assert_eq!(
576 items.len(),
577 6,
578 "Should have 1 item from section 1 + 5 from section 2"
579 );
580 assert_eq!(items[0], (1, 9, test_digest(9)));
581 for (i, (section, pos, item)) in items.iter().enumerate().skip(1) {
582 assert_eq!(*section, 2);
583 assert_eq!(*pos, (i - 1) as u64);
584 assert_eq!(*item, test_digest((i + 9) as u64));
585 }
586 }
587
588 {
590 let stream = journal
591 .replay(2, 3, NZUsize!(1024))
592 .await
593 .expect("failed to replay");
594 pin_mut!(stream);
595
596 let mut items = Vec::new();
597 while let Some(result) = stream.next().await {
598 let (section, pos, item) = result.expect("replay error");
599 items.push((section, pos, item));
600 }
601
602 assert_eq!(items.len(), 2, "Should have 2 items from section 2");
603 assert_eq!(items[0], (2, 3, test_digest(13)));
604 assert_eq!(items[1], (2, 4, test_digest(14)));
605 }
606
607 let result = journal.replay(1, 100, NZUsize!(1024)).await;
609 assert!(matches!(result, Err(Error::ItemOutOfRange(100))));
610 drop(result);
611
612 journal.destroy().await.expect("failed to destroy");
613 });
614 }
615
616 #[test_traced]
617 fn test_segmented_fixed_prune() {
618 let executor = deterministic::Runner::default();
619 executor.start(|context| async move {
620 let cfg = test_cfg(&context);
621 let mut journal = Journal::init(context.clone(), cfg.clone())
622 .await
623 .expect("failed to init");
624
625 for section in 1u64..=5 {
626 journal
627 .append(section, &test_digest(section))
628 .await
629 .expect("failed to append");
630 }
631 journal.sync_all().await.expect("failed to sync");
632
633 journal.prune(3).await.expect("failed to prune");
634
635 let err = journal.get(1, 0).await;
636 assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
637
638 let err = journal.get(2, 0).await;
639 assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
640
641 let item = journal.get(3, 0).await.expect("should exist");
642 assert_eq!(item, test_digest(3));
643
644 journal.destroy().await.expect("failed to destroy");
645 });
646 }
647
648 #[test_traced]
649 fn test_segmented_fixed_rewind() {
650 let executor = deterministic::Runner::default();
651 executor.start(|context| async move {
652 let cfg = test_cfg(&context);
653 let mut journal = Journal::init(context.clone(), cfg.clone())
654 .await
655 .expect("failed to init");
656
657 for section in 1u64..=3 {
659 journal
660 .append(section, &test_digest(section))
661 .await
662 .expect("failed to append");
663 }
664 journal.sync_all().await.expect("failed to sync");
665
666 for section in 1u64..=3 {
668 let size = journal.size(section).await.expect("failed to get size");
669 assert!(size > 0, "section {section} should have data");
670 }
671
672 let size = journal.size(1).await.expect("failed to get size");
674 journal.rewind(1, size).await.expect("failed to rewind");
675
676 let size = journal.size(1).await.expect("failed to get size");
678 assert!(size > 0, "section 1 should still have data");
679
680 for section in 2u64..=3 {
682 let size = journal.size(section).await.expect("failed to get size");
683 assert_eq!(size, 0, "section {section} should be removed");
684 }
685
686 let item = journal.get(1, 0).await.expect("failed to get");
688 assert_eq!(item, test_digest(1));
689
690 journal.destroy().await.expect("failed to destroy");
691 });
692 }
693
694 #[test_traced]
695 fn test_segmented_fixed_rewind_many_sections() {
696 let executor = deterministic::Runner::default();
697 executor.start(|context| async move {
698 let cfg = test_cfg(&context);
699 let mut journal = Journal::init(context.clone(), cfg.clone())
700 .await
701 .expect("failed to init");
702
703 for section in 1u64..=10 {
705 journal
706 .append(section, &test_digest(section))
707 .await
708 .expect("failed to append");
709 }
710 journal.sync_all().await.expect("failed to sync");
711
712 let size = journal.size(5).await.expect("failed to get size");
714 journal.rewind(5, size).await.expect("failed to rewind");
715
716 for section in 1u64..=5 {
718 let size = journal.size(section).await.expect("failed to get size");
719 assert!(size > 0, "section {section} should still have data");
720 }
721
722 for section in 6u64..=10 {
724 let size = journal.size(section).await.expect("failed to get size");
725 assert_eq!(size, 0, "section {section} should be removed");
726 }
727
728 {
730 let stream = journal
731 .replay(0, 0, NZUsize!(1024))
732 .await
733 .expect("failed to replay");
734 pin_mut!(stream);
735 let mut items = Vec::new();
736 while let Some(result) = stream.next().await {
737 let (section, _, item) = result.expect("failed to read");
738 items.push((section, item));
739 }
740 assert_eq!(items.len(), 5);
741 for (i, (section, item)) in items.iter().enumerate() {
742 assert_eq!(*section, (i + 1) as u64);
743 assert_eq!(*item, test_digest((i + 1) as u64));
744 }
745 }
746
747 journal.destroy().await.expect("failed to destroy");
748 });
749 }
750
751 #[test_traced]
752 fn test_segmented_fixed_rewind_persistence() {
753 let executor = deterministic::Runner::default();
754 executor.start(|context| async move {
755 let cfg = test_cfg(&context);
756
757 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
759 .await
760 .expect("failed to init");
761 for section in 1u64..=5 {
762 journal
763 .append(section, &test_digest(section))
764 .await
765 .expect("failed to append");
766 }
767 journal.sync_all().await.expect("failed to sync");
768
769 let size = journal.size(2).await.expect("failed to get size");
771 journal.rewind(2, size).await.expect("failed to rewind");
772 journal.sync_all().await.expect("failed to sync");
773 drop(journal);
774
775 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
777 .await
778 .expect("failed to re-init");
779
780 for section in 1u64..=2 {
782 let size = journal.size(section).await.expect("failed to get size");
783 assert!(size > 0, "section {section} should have data after restart");
784 }
785
786 for section in 3u64..=5 {
788 let size = journal.size(section).await.expect("failed to get size");
789 assert_eq!(size, 0, "section {section} should be gone after restart");
790 }
791
792 let item1 = journal.get(1, 0).await.expect("failed to get");
794 assert_eq!(item1, test_digest(1));
795 let item2 = journal.get(2, 0).await.expect("failed to get");
796 assert_eq!(item2, test_digest(2));
797
798 journal.destroy().await.expect("failed to destroy");
799 });
800 }
801
802 #[test_traced]
803 fn test_segmented_fixed_corruption_recovery() {
804 let executor = deterministic::Runner::default();
805 executor.start(|context| async move {
806 let cfg = test_cfg(&context);
807 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
808 .await
809 .expect("failed to init");
810
811 for i in 0u64..5 {
812 journal
813 .append(1, &test_digest(i))
814 .await
815 .expect("failed to append");
816 }
817 journal.sync_all().await.expect("failed to sync");
818 drop(journal);
819
820 let (blob, size) = context
821 .open(&cfg.partition, &1u64.to_be_bytes())
822 .await
823 .expect("failed to open blob");
824 blob.resize(size - 1).await.expect("failed to truncate");
825 blob.sync().await.expect("failed to sync");
826
827 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
828 .await
829 .expect("failed to re-init");
830
831 let count = {
832 let stream = journal
833 .replay(0, 0, NZUsize!(1024))
834 .await
835 .expect("failed to replay");
836 pin_mut!(stream);
837
838 let mut count = 0;
839 while let Some(result) = stream.next().await {
840 result.expect("should be ok");
841 count += 1;
842 }
843 count
844 };
845 assert_eq!(count, 4);
846
847 journal.destroy().await.expect("failed to destroy");
848 });
849 }
850
851 #[test_traced]
852 fn test_segmented_fixed_persistence() {
853 let executor = deterministic::Runner::default();
854 executor.start(|context| async move {
855 let cfg = test_cfg(&context);
856
857 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
859 .await
860 .expect("failed to init");
861
862 for i in 0u64..5 {
863 journal
864 .append(1, &test_digest(i))
865 .await
866 .expect("failed to append");
867 }
868 journal.sync_all().await.expect("failed to sync");
869 drop(journal);
870
871 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg)
873 .await
874 .expect("failed to re-init");
875
876 for i in 0u64..5 {
877 let item = journal.get(1, i).await.expect("failed to get");
878 assert_eq!(item, test_digest(i));
879 }
880
881 journal.destroy().await.expect("failed to destroy");
882 });
883 }
884
885 #[test_traced]
886 fn test_segmented_fixed_section_len() {
887 let executor = deterministic::Runner::default();
888 executor.start(|context| async move {
889 let cfg = test_cfg(&context);
890 let mut journal = Journal::init(context.clone(), cfg.clone())
891 .await
892 .expect("failed to init");
893
894 assert_eq!(journal.section_len(1).await.unwrap(), 0);
895
896 for i in 0u64..5 {
897 journal
898 .append(1, &test_digest(i))
899 .await
900 .expect("failed to append");
901 }
902
903 assert_eq!(journal.section_len(1).await.unwrap(), 5);
904 assert_eq!(journal.section_len(2).await.unwrap(), 0);
905
906 journal.destroy().await.expect("failed to destroy");
907 });
908 }
909
910 #[test_traced]
911 fn test_segmented_fixed_non_contiguous_sections() {
912 let executor = deterministic::Runner::default();
915 executor.start(|context| async move {
916 let cfg = test_cfg(&context);
917 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
918 .await
919 .expect("failed to init");
920
921 journal
923 .append(1, &test_digest(100))
924 .await
925 .expect("failed to append");
926 journal
927 .append(5, &test_digest(500))
928 .await
929 .expect("failed to append");
930 journal
931 .append(10, &test_digest(1000))
932 .await
933 .expect("failed to append");
934 journal.sync_all().await.expect("failed to sync");
935
936 assert_eq!(journal.get(1, 0).await.unwrap(), test_digest(100));
938 assert_eq!(journal.get(5, 0).await.unwrap(), test_digest(500));
939 assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(1000));
940
941 for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
943 let result = journal.get(missing_section, 0).await;
944 assert!(
945 matches!(result, Err(Error::SectionOutOfRange(_))),
946 "Expected SectionOutOfRange for section {}, got {:?}",
947 missing_section,
948 result
949 );
950 }
951
952 drop(journal);
954 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
955 .await
956 .expect("failed to re-init");
957
958 {
960 let stream = journal
961 .replay(0, 0, NZUsize!(1024))
962 .await
963 .expect("failed to replay");
964 pin_mut!(stream);
965
966 let mut items = Vec::new();
967 while let Some(result) = stream.next().await {
968 let (section, _, item) = result.expect("replay error");
969 items.push((section, item));
970 }
971
972 assert_eq!(items.len(), 3, "Should have 3 items");
973 assert_eq!(items[0], (1, test_digest(100)));
974 assert_eq!(items[1], (5, test_digest(500)));
975 assert_eq!(items[2], (10, test_digest(1000)));
976 }
977
978 {
980 let stream = journal
981 .replay(5, 0, NZUsize!(1024))
982 .await
983 .expect("failed to replay from section 5");
984 pin_mut!(stream);
985
986 let mut items = Vec::new();
987 while let Some(result) = stream.next().await {
988 let (section, _, item) = result.expect("replay error");
989 items.push((section, item));
990 }
991
992 assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
993 assert_eq!(items[0], (5, test_digest(500)));
994 assert_eq!(items[1], (10, test_digest(1000)));
995 }
996
997 journal.destroy().await.expect("failed to destroy");
998 });
999 }
1000
1001 #[test_traced]
1002 fn test_segmented_fixed_empty_section_in_middle() {
1003 let executor = deterministic::Runner::default();
1006 executor.start(|context| async move {
1007 let cfg = test_cfg(&context);
1008 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1009 .await
1010 .expect("failed to init");
1011
1012 journal
1014 .append(1, &test_digest(100))
1015 .await
1016 .expect("failed to append");
1017
1018 journal
1020 .append(2, &test_digest(200))
1021 .await
1022 .expect("failed to append");
1023 journal.sync(2).await.expect("failed to sync");
1024 journal
1025 .rewind_section(2, 0)
1026 .await
1027 .expect("failed to rewind");
1028
1029 journal
1031 .append(3, &test_digest(300))
1032 .await
1033 .expect("failed to append");
1034
1035 journal.sync_all().await.expect("failed to sync");
1036
1037 assert_eq!(journal.section_len(1).await.unwrap(), 1);
1039 assert_eq!(journal.section_len(2).await.unwrap(), 0);
1040 assert_eq!(journal.section_len(3).await.unwrap(), 1);
1041
1042 drop(journal);
1044 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1045 .await
1046 .expect("failed to re-init");
1047
1048 {
1050 let stream = journal
1051 .replay(0, 0, NZUsize!(1024))
1052 .await
1053 .expect("failed to replay");
1054 pin_mut!(stream);
1055
1056 let mut items = Vec::new();
1057 while let Some(result) = stream.next().await {
1058 let (section, _, item) = result.expect("replay error");
1059 items.push((section, item));
1060 }
1061
1062 assert_eq!(
1063 items.len(),
1064 2,
1065 "Should have 2 items (skipping empty section)"
1066 );
1067 assert_eq!(items[0], (1, test_digest(100)));
1068 assert_eq!(items[1], (3, test_digest(300)));
1069 }
1070
1071 {
1073 let stream = journal
1074 .replay(2, 0, NZUsize!(1024))
1075 .await
1076 .expect("failed to replay from section 2");
1077 pin_mut!(stream);
1078
1079 let mut items = Vec::new();
1080 while let Some(result) = stream.next().await {
1081 let (section, _, item) = result.expect("replay error");
1082 items.push((section, item));
1083 }
1084
1085 assert_eq!(items.len(), 1, "Should have 1 item from section 3");
1086 assert_eq!(items[0], (3, test_digest(300)));
1087 }
1088
1089 journal.destroy().await.expect("failed to destroy");
1090 });
1091 }
1092
1093 #[test_traced]
1094 fn test_segmented_fixed_truncation_recovery_across_page_boundary() {
1095 let executor = deterministic::Runner::default();
1107 executor.start(|context| async move {
1108 let cfg = test_cfg(&context);
1109 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1110 .await
1111 .expect("failed to init");
1112
1113 for i in 0u64..3 {
1115 journal
1116 .append(1, &test_digest(i))
1117 .await
1118 .expect("failed to append");
1119 }
1120 journal.sync_all().await.expect("failed to sync");
1121
1122 for i in 0u64..3 {
1124 let item = journal.get(1, i).await.expect("failed to get");
1125 assert_eq!(item, test_digest(i));
1126 }
1127 drop(journal);
1128
1129 let (blob, size) = context
1131 .open(&cfg.partition, &1u64.to_be_bytes())
1132 .await
1133 .expect("failed to open blob");
1134 blob.resize(size - 1).await.expect("failed to truncate");
1135 blob.sync().await.expect("failed to sync");
1136 drop(blob);
1137
1138 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1141 .await
1142 .expect("failed to re-init");
1143
1144 assert_eq!(journal.section_len(1).await.unwrap(), 2);
1146
1147 assert_eq!(journal.size(1).await.unwrap(), 64);
1150
1151 let item0 = journal.get(1, 0).await.expect("failed to get item 0");
1153 assert_eq!(item0, test_digest(0));
1154 let item1 = journal.get(1, 1).await.expect("failed to get item 1");
1155 assert_eq!(item1, test_digest(1));
1156
1157 let err = journal.get(1, 2).await;
1159 assert!(
1160 matches!(err, Err(Error::ItemOutOfRange(2))),
1161 "expected ItemOutOfRange(2), got {:?}",
1162 err
1163 );
1164
1165 journal.destroy().await.expect("failed to destroy");
1166 });
1167 }
1168
1169 #[test_traced]
1170 fn test_journal_clear() {
1171 let executor = deterministic::Runner::default();
1172 executor.start(|context| async move {
1173 let cfg = Config {
1174 partition: "clear-test".into(),
1175 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1176 write_buffer: NZUsize!(1024),
1177 };
1178
1179 let mut journal: Journal<_, Digest> =
1180 Journal::init(context.with_label("journal"), cfg.clone())
1181 .await
1182 .expect("Failed to initialize journal");
1183
1184 for section in 0..5u64 {
1186 for i in 0..10u64 {
1187 journal
1188 .append(section, &test_digest(section * 1000 + i))
1189 .await
1190 .expect("Failed to append");
1191 }
1192 journal.sync(section).await.expect("Failed to sync");
1193 }
1194
1195 assert_eq!(journal.get(0, 0).await.unwrap(), test_digest(0));
1197 assert_eq!(journal.get(4, 0).await.unwrap(), test_digest(4000));
1198
1199 journal.clear().await.expect("Failed to clear");
1201
1202 for section in 0..5u64 {
1204 assert!(matches!(
1205 journal.get(section, 0).await,
1206 Err(Error::SectionOutOfRange(s)) if s == section
1207 ));
1208 }
1209
1210 for i in 0..5u64 {
1212 journal
1213 .append(10, &test_digest(i * 100))
1214 .await
1215 .expect("Failed to append after clear");
1216 }
1217 journal.sync(10).await.expect("Failed to sync after clear");
1218
1219 assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(0));
1221
1222 assert!(matches!(
1224 journal.get(0, 0).await,
1225 Err(Error::SectionOutOfRange(0))
1226 ));
1227
1228 journal.destroy().await.unwrap();
1229 });
1230 }
1231
1232 #[test_traced]
1233 fn test_last_missing_section_returns_error() {
1234 let executor = deterministic::Runner::default();
1235 executor.start(|context| async move {
1236 let cfg = test_cfg(&context);
1237 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1238 .await
1239 .expect("failed to init");
1240
1241 assert!(matches!(
1242 journal.last(0).await,
1243 Err(Error::SectionOutOfRange(0))
1244 ));
1245 assert!(matches!(
1246 journal.last(99).await,
1247 Err(Error::SectionOutOfRange(99))
1248 ));
1249
1250 journal.destroy().await.unwrap();
1251 });
1252 }
1253
1254 #[test_traced]
1255 fn test_last_after_rewind_to_zero() {
1256 let executor = deterministic::Runner::default();
1257 executor.start(|context| async move {
1258 let cfg = test_cfg(&context);
1259 let mut journal = Journal::init(context.clone(), cfg.clone())
1260 .await
1261 .expect("failed to init");
1262
1263 journal.append(0, &test_digest(0)).await.unwrap();
1264 journal.append(0, &test_digest(1)).await.unwrap();
1265 journal.sync(0).await.unwrap();
1266
1267 assert!(journal.last(0).await.unwrap().is_some());
1268
1269 journal.rewind(0, 0).await.unwrap();
1270 assert_eq!(journal.last(0).await.unwrap(), None);
1271
1272 journal.destroy().await.unwrap();
1273 });
1274 }
1275
1276 #[test_traced]
1277 fn test_last_pruned_section_returns_error() {
1278 let executor = deterministic::Runner::default();
1279 executor.start(|context| async move {
1280 let cfg = test_cfg(&context);
1281 let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1282 .await
1283 .expect("failed to init");
1284
1285 journal.append(0, &test_digest(0)).await.unwrap();
1286 journal.append(1, &test_digest(1)).await.unwrap();
1287 journal.sync_all().await.unwrap();
1288
1289 journal.prune(1).await.unwrap();
1290
1291 assert!(matches!(
1292 journal.last(0).await,
1293 Err(Error::AlreadyPrunedToSection(1))
1294 ));
1295 assert!(journal.last(1).await.unwrap().is_some());
1296
1297 journal.destroy().await.unwrap();
1298 });
1299 }
1300}