1use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
24use crate::journal::Error;
25use commonware_codec::{CodecFixed, CodecFixedShared, DecodeExt as _, ReadExt as _};
26use commonware_runtime::{
27 buffer::paged::{CacheRef, Replay},
28 Blob, Buf, Metrics, Storage,
29};
30use futures::{
31 stream::{self, Stream},
32 StreamExt,
33};
34use std::{marker::PhantomData, num::NonZeroUsize};
35use tracing::{trace, warn};
36
37struct ReplayState<B: Blob> {
39 section: u64,
40 replay: Replay<B>,
41 position: u64,
42 done: bool,
43}
44
45#[derive(Clone)]
47pub struct Config {
48 pub partition: String,
50
51 pub page_cache: CacheRef,
53
54 pub write_buffer: NonZeroUsize,
56}
57
58pub struct Journal<E: Storage + Metrics, A: CodecFixed> {
72 manager: Manager<E, AppendFactory>,
73 _array: PhantomData<A>,
74}
75
76impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
77 pub const CHUNK_SIZE: usize = A::SIZE;
79 const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
80
81 pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
86 let manager_cfg = ManagerConfig {
87 partition: cfg.partition,
88 factory: AppendFactory {
89 write_buffer: cfg.write_buffer,
90 page_cache_ref: cfg.page_cache,
91 },
92 };
93 let mut manager = Manager::init(context, manager_cfg).await?;
94
95 let sections: Vec<_> = manager.sections().collect();
97 for section in sections {
98 let size = manager.size(section).await?;
99 if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
100 let valid_size = size - (size % Self::CHUNK_SIZE_U64);
101 warn!(
102 section,
103 invalid_size = size,
104 new_size = valid_size,
105 "trailing bytes detected: truncating"
106 );
107 manager.rewind_section(section, valid_size).await?;
108 }
109 }
110
111 Ok(Self {
112 manager,
113 _array: PhantomData,
114 })
115 }
116
117 pub async fn append(&mut self, section: u64, item: &A) -> Result<u64, Error> {
121 let blob = self.manager.get_or_create(section).await?;
122
123 let size = blob.size().await;
124 if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
125 return Err(Error::InvalidBlobSize(section, size));
126 }
127 let position = size / Self::CHUNK_SIZE_U64;
128
129 let buf = item.encode_mut();
131 blob.append(&buf).await?;
132 trace!(section, position, "appended item");
133
134 Ok(position)
135 }
136
137 pub(crate) async fn append_raw(&mut self, section: u64, buf: &[u8]) -> Result<(), Error> {
145 assert!(!buf.is_empty());
146 assert!(buf.len().is_multiple_of(Self::CHUNK_SIZE));
147 let blob = self.manager.get_or_create(section).await?;
148 blob.append(buf).await?;
149 trace!(
150 section,
151 count = buf.len() / Self::CHUNK_SIZE,
152 "appended items"
153 );
154 Ok(())
155 }
156
157 pub async fn get(&self, section: u64, position: u64) -> Result<A, Error> {
165 let blob = self
166 .manager
167 .get(section)?
168 .ok_or(Error::SectionOutOfRange(section))?;
169
170 let offset = position
171 .checked_mul(Self::CHUNK_SIZE_U64)
172 .ok_or(Error::ItemOutOfRange(position))?;
173 let end = offset
174 .checked_add(Self::CHUNK_SIZE_U64)
175 .ok_or(Error::ItemOutOfRange(position))?;
176 if end > blob.size().await {
177 return Err(Error::ItemOutOfRange(position));
178 }
179
180 let buf = blob.read_at(offset, Self::CHUNK_SIZE).await?;
181 A::decode(buf.coalesce()).map_err(Error::Codec)
182 }
183
184 pub fn try_get_sync(&self, section: u64, position: u64) -> Option<A> {
186 let blob = self.manager.get(section).ok()??;
187 let offset = position.checked_mul(Self::CHUNK_SIZE_U64)?;
188 let remaining = blob.try_size()?.checked_sub(offset)?;
189 if remaining < Self::CHUNK_SIZE_U64 {
190 return None;
191 }
192 let mut buf = vec![0u8; Self::CHUNK_SIZE];
193 if !blob.try_read_sync(offset, &mut buf) {
194 return None;
195 }
196 A::decode(&buf[..]).ok()
197 }
198
199 pub async fn last(&self, section: u64) -> Result<Option<A>, Error> {
208 let blob = self
209 .manager
210 .get(section)?
211 .ok_or(Error::SectionOutOfRange(section))?;
212
213 let size = blob.size().await;
214 if size < Self::CHUNK_SIZE_U64 {
215 return Ok(None);
216 }
217
218 let last_position = (size / Self::CHUNK_SIZE_U64) - 1;
219 let offset = last_position * Self::CHUNK_SIZE_U64;
220 let buf = blob.read_at(offset, Self::CHUNK_SIZE).await?;
221 A::decode(buf.coalesce()).map_err(Error::Codec).map(Some)
222 }
223
224 pub async fn replay(
228 &self,
229 start_section: u64,
230 start_position: u64,
231 buffer: NonZeroUsize,
232 ) -> Result<impl Stream<Item = Result<(u64, u64, A), Error>> + Send + '_, Error> {
233 let mut blob_info = Vec::new();
235 for (§ion, blob) in self.manager.sections_from(start_section) {
236 let blob_size = blob.size().await;
237 let mut replay = blob.replay(buffer).await?;
238 let initial_position = if section == start_section {
240 let start = start_position * Self::CHUNK_SIZE_U64;
241 if start > blob_size {
242 return Err(Error::ItemOutOfRange(start_position));
243 }
244 replay.seek_to(start)?;
245 start_position
246 } else {
247 0
248 };
249 blob_info.push((section, replay, initial_position));
250 }
251
252 Ok(
256 stream::iter(blob_info).flat_map(move |(section, replay, initial_position)| {
257 stream::unfold(
258 ReplayState {
259 section,
260 replay,
261 position: initial_position,
262 done: false,
263 },
264 move |mut state| async move {
265 if state.done {
266 return None;
267 }
268
269 let mut batch: Vec<Result<(u64, u64, A), Error>> = Vec::new();
270 loop {
271 match state.replay.ensure(Self::CHUNK_SIZE).await {
273 Ok(true) => {}
274 Ok(false) => {
275 state.done = true;
277 return if batch.is_empty() {
278 None
279 } else {
280 Some((batch, state))
281 };
282 }
283 Err(err) => {
284 batch.push(Err(Error::Runtime(err)));
285 state.done = true;
286 return Some((batch, state));
287 }
288 }
289
290 while state.replay.remaining() >= Self::CHUNK_SIZE {
292 match A::read(&mut state.replay) {
293 Ok(item) => {
294 batch.push(Ok((state.section, state.position, item)));
295 state.position += 1;
296 }
297 Err(err) => {
298 batch.push(Err(Error::Codec(err)));
299 state.done = true;
300 return Some((batch, state));
301 }
302 }
303 }
304
305 if !batch.is_empty() {
307 return Some((batch, state));
308 }
309 }
310 },
311 )
312 .flat_map(stream::iter)
313 }),
314 )
315 }
316
317 pub async fn sync(&self, section: u64) -> Result<(), Error> {
319 self.manager.sync(section).await
320 }
321
322 pub async fn sync_all(&self) -> Result<(), Error> {
324 self.manager.sync_all().await
325 }
326
327 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
329 self.manager.prune(min).await
330 }
331
332 pub fn oldest_section(&self) -> Option<u64> {
334 self.manager.oldest_section()
335 }
336
337 pub fn newest_section(&self) -> Option<u64> {
339 self.manager.newest_section()
340 }
341
342 pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
344 self.manager.sections_from(0).map(|(section, _)| *section)
345 }
346
347 pub async fn section_len(&self, section: u64) -> Result<u64, Error> {
349 let size = self.manager.size(section).await?;
350 Ok(size / Self::CHUNK_SIZE_U64)
351 }
352
353 pub async fn size(&self, section: u64) -> Result<u64, Error> {
355 self.manager.size(section).await
356 }
357
358 pub async fn rewind(&mut self, section: u64, offset: u64) -> Result<(), Error> {
363 self.manager.rewind(section, offset).await
364 }
365
366 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
370 self.manager.rewind_section(section, size).await
371 }
372
373 pub async fn destroy(self) -> Result<(), Error> {
375 self.manager.destroy().await
376 }
377
378 pub async fn clear(&mut self) -> Result<(), Error> {
382 self.manager.clear().await
383 }
384
385 pub(crate) async fn ensure_section_exists(&mut self, section: u64) -> Result<(), Error> {
390 self.manager.get_or_create(section).await?;
391 Ok(())
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
399 use commonware_macros::test_traced;
400 use commonware_runtime::{
401 buffer::paged::CacheRef, deterministic, BufferPooler, Metrics, Runner,
402 };
403 use commonware_utils::{NZUsize, NZU16};
404 use core::num::NonZeroU16;
405 use futures::{pin_mut, StreamExt};
406
407 const PAGE_SIZE: NonZeroU16 = NZU16!(44);
408 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
409
410 fn test_digest(value: u64) -> Digest {
411 Sha256::hash(&value.to_be_bytes())
412 }
413
414 fn test_cfg(pooler: &impl BufferPooler) -> Config {
415 Config {
416 partition: "test-partition".into(),
417 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
418 write_buffer: NZUsize!(2048),
419 }
420 }
421
422 #[test_traced]
423 fn test_segmented_fixed_append_and_get() {
424 let executor = deterministic::Runner::default();
425 executor.start(|context| async move {
426 let cfg = test_cfg(&context);
427 let mut journal = Journal::init(context.clone(), cfg.clone())
428 .await
429 .expect("failed to init");
430
431 let pos0 = journal
432 .append(1, &test_digest(0))
433 .await
434 .expect("failed to append");
435 assert_eq!(pos0, 0);
436
437 let pos1 = journal
438 .append(1, &test_digest(1))
439 .await
440 .expect("failed to append");
441 assert_eq!(pos1, 1);
442
443 let pos2 = journal
444 .append(2, &test_digest(2))
445 .await
446 .expect("failed to append");
447 assert_eq!(pos2, 0);
448
449 let item0 = journal.get(1, 0).await.expect("failed to get");
450 assert_eq!(item0, test_digest(0));
451
452 let item1 = journal.get(1, 1).await.expect("failed to get");
453 assert_eq!(item1, test_digest(1));
454
455 let item2 = journal.get(2, 0).await.expect("failed to get");
456 assert_eq!(item2, test_digest(2));
457
458 let err = journal.get(1, 2).await;
459 assert!(matches!(err, Err(Error::ItemOutOfRange(2))));
460
461 let err = journal.get(3, 0).await;
462 assert!(matches!(err, Err(Error::SectionOutOfRange(3))));
463
464 journal.destroy().await.expect("failed to destroy");
465 });
466 }
467
468 #[test_traced]
469 fn test_segmented_fixed_replay() {
470 let executor = deterministic::Runner::default();
471 executor.start(|context| async move {
472 let cfg = test_cfg(&context);
473 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
474 .await
475 .expect("failed to init");
476
477 for i in 0u64..10 {
478 journal
479 .append(1, &test_digest(i))
480 .await
481 .expect("failed to append");
482 }
483 for i in 10u64..20 {
484 journal
485 .append(2, &test_digest(i))
486 .await
487 .expect("failed to append");
488 }
489
490 journal.sync_all().await.expect("failed to sync");
491 drop(journal);
492
493 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
494 .await
495 .expect("failed to re-init");
496
497 let items = {
498 let stream = journal
499 .replay(0, 0, NZUsize!(1024))
500 .await
501 .expect("failed to replay");
502 pin_mut!(stream);
503
504 let mut items = Vec::new();
505 while let Some(result) = stream.next().await {
506 match result {
507 Ok((section, pos, item)) => items.push((section, pos, item)),
508 Err(err) => panic!("replay error: {err}"),
509 }
510 }
511 items
512 };
513
514 assert_eq!(items.len(), 20);
515 for (i, item) in items.iter().enumerate().take(10) {
516 assert_eq!(item.0, 1);
517 assert_eq!(item.1, i as u64);
518 assert_eq!(item.2, test_digest(i as u64));
519 }
520 for (i, item) in items.iter().enumerate().skip(10).take(10) {
521 assert_eq!(item.0, 2);
522 assert_eq!(item.1, (i - 10) as u64);
523 assert_eq!(item.2, test_digest(i as u64));
524 }
525
526 journal.destroy().await.expect("failed to destroy");
527 });
528 }
529
530 #[test_traced]
531 fn test_segmented_fixed_replay_with_start_offset() {
532 let executor = deterministic::Runner::default();
534 executor.start(|context| async move {
535 let cfg = test_cfg(&context);
536 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
537 .await
538 .expect("failed to init");
539
540 for i in 0u64..10 {
542 journal
543 .append(1, &test_digest(i))
544 .await
545 .expect("failed to append");
546 }
547 for i in 10u64..15 {
549 journal
550 .append(2, &test_digest(i))
551 .await
552 .expect("failed to append");
553 }
554 journal.sync_all().await.expect("failed to sync");
555 drop(journal);
556
557 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
558 .await
559 .expect("failed to re-init");
560
561 {
563 let stream = journal
564 .replay(1, 5, NZUsize!(1024))
565 .await
566 .expect("failed to replay");
567 pin_mut!(stream);
568
569 let mut items = Vec::new();
570 while let Some(result) = stream.next().await {
571 let (section, pos, item) = result.expect("replay error");
572 items.push((section, pos, item));
573 }
574
575 assert_eq!(
576 items.len(),
577 10,
578 "Should have 5 items from section 1 + 5 from section 2"
579 );
580
581 for (i, (section, pos, item)) in items.iter().enumerate().take(5) {
583 assert_eq!(*section, 1);
584 assert_eq!(*pos, (i + 5) as u64);
585 assert_eq!(*item, test_digest((i + 5) as u64));
586 }
587
588 for (i, (section, pos, item)) in items.iter().enumerate().skip(5) {
590 assert_eq!(*section, 2);
591 assert_eq!(*pos, (i - 5) as u64);
592 assert_eq!(*item, test_digest((i + 5) as u64));
593 }
594 }
595
596 {
598 let stream = journal
599 .replay(1, 9, NZUsize!(1024))
600 .await
601 .expect("failed to replay");
602 pin_mut!(stream);
603
604 let mut items = Vec::new();
605 while let Some(result) = stream.next().await {
606 let (section, pos, item) = result.expect("replay error");
607 items.push((section, pos, item));
608 }
609
610 assert_eq!(
611 items.len(),
612 6,
613 "Should have 1 item from section 1 + 5 from section 2"
614 );
615 assert_eq!(items[0], (1, 9, test_digest(9)));
616 for (i, (section, pos, item)) in items.iter().enumerate().skip(1) {
617 assert_eq!(*section, 2);
618 assert_eq!(*pos, (i - 1) as u64);
619 assert_eq!(*item, test_digest((i + 9) as u64));
620 }
621 }
622
623 {
625 let stream = journal
626 .replay(2, 3, NZUsize!(1024))
627 .await
628 .expect("failed to replay");
629 pin_mut!(stream);
630
631 let mut items = Vec::new();
632 while let Some(result) = stream.next().await {
633 let (section, pos, item) = result.expect("replay error");
634 items.push((section, pos, item));
635 }
636
637 assert_eq!(items.len(), 2, "Should have 2 items from section 2");
638 assert_eq!(items[0], (2, 3, test_digest(13)));
639 assert_eq!(items[1], (2, 4, test_digest(14)));
640 }
641
642 let result = journal.replay(1, 100, NZUsize!(1024)).await;
644 assert!(matches!(result, Err(Error::ItemOutOfRange(100))));
645 drop(result);
646
647 journal.destroy().await.expect("failed to destroy");
648 });
649 }
650
651 #[test_traced]
652 fn test_segmented_fixed_prune() {
653 let executor = deterministic::Runner::default();
654 executor.start(|context| async move {
655 let cfg = test_cfg(&context);
656 let mut journal = Journal::init(context.clone(), cfg.clone())
657 .await
658 .expect("failed to init");
659
660 for section in 1u64..=5 {
661 journal
662 .append(section, &test_digest(section))
663 .await
664 .expect("failed to append");
665 }
666 journal.sync_all().await.expect("failed to sync");
667
668 journal.prune(3).await.expect("failed to prune");
669
670 let err = journal.get(1, 0).await;
671 assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
672
673 let err = journal.get(2, 0).await;
674 assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
675
676 let item = journal.get(3, 0).await.expect("should exist");
677 assert_eq!(item, test_digest(3));
678
679 journal.destroy().await.expect("failed to destroy");
680 });
681 }
682
683 #[test_traced]
684 fn test_segmented_fixed_rewind() {
685 let executor = deterministic::Runner::default();
686 executor.start(|context| async move {
687 let cfg = test_cfg(&context);
688 let mut journal = Journal::init(context.clone(), cfg.clone())
689 .await
690 .expect("failed to init");
691
692 for section in 1u64..=3 {
694 journal
695 .append(section, &test_digest(section))
696 .await
697 .expect("failed to append");
698 }
699 journal.sync_all().await.expect("failed to sync");
700
701 for section in 1u64..=3 {
703 let size = journal.size(section).await.expect("failed to get size");
704 assert!(size > 0, "section {section} should have data");
705 }
706
707 let size = journal.size(1).await.expect("failed to get size");
709 journal.rewind(1, size).await.expect("failed to rewind");
710
711 let size = journal.size(1).await.expect("failed to get size");
713 assert!(size > 0, "section 1 should still have data");
714
715 for section in 2u64..=3 {
717 let size = journal.size(section).await.expect("failed to get size");
718 assert_eq!(size, 0, "section {section} should be removed");
719 }
720
721 let item = journal.get(1, 0).await.expect("failed to get");
723 assert_eq!(item, test_digest(1));
724
725 journal.destroy().await.expect("failed to destroy");
726 });
727 }
728
729 #[test_traced]
730 fn test_segmented_fixed_rewind_many_sections() {
731 let executor = deterministic::Runner::default();
732 executor.start(|context| async move {
733 let cfg = test_cfg(&context);
734 let mut journal = Journal::init(context.clone(), cfg.clone())
735 .await
736 .expect("failed to init");
737
738 for section in 1u64..=10 {
740 journal
741 .append(section, &test_digest(section))
742 .await
743 .expect("failed to append");
744 }
745 journal.sync_all().await.expect("failed to sync");
746
747 let size = journal.size(5).await.expect("failed to get size");
749 journal.rewind(5, size).await.expect("failed to rewind");
750
751 for section in 1u64..=5 {
753 let size = journal.size(section).await.expect("failed to get size");
754 assert!(size > 0, "section {section} should still have data");
755 }
756
757 for section in 6u64..=10 {
759 let size = journal.size(section).await.expect("failed to get size");
760 assert_eq!(size, 0, "section {section} should be removed");
761 }
762
763 {
765 let stream = journal
766 .replay(0, 0, NZUsize!(1024))
767 .await
768 .expect("failed to replay");
769 pin_mut!(stream);
770 let mut items = Vec::new();
771 while let Some(result) = stream.next().await {
772 let (section, _, item) = result.expect("failed to read");
773 items.push((section, item));
774 }
775 assert_eq!(items.len(), 5);
776 for (i, (section, item)) in items.iter().enumerate() {
777 assert_eq!(*section, (i + 1) as u64);
778 assert_eq!(*item, test_digest((i + 1) as u64));
779 }
780 }
781
782 journal.destroy().await.expect("failed to destroy");
783 });
784 }
785
786 #[test_traced]
787 fn test_segmented_fixed_rewind_persistence() {
788 let executor = deterministic::Runner::default();
789 executor.start(|context| async move {
790 let cfg = test_cfg(&context);
791
792 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
794 .await
795 .expect("failed to init");
796 for section in 1u64..=5 {
797 journal
798 .append(section, &test_digest(section))
799 .await
800 .expect("failed to append");
801 }
802 journal.sync_all().await.expect("failed to sync");
803
804 let size = journal.size(2).await.expect("failed to get size");
806 journal.rewind(2, size).await.expect("failed to rewind");
807 journal.sync_all().await.expect("failed to sync");
808 drop(journal);
809
810 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
812 .await
813 .expect("failed to re-init");
814
815 for section in 1u64..=2 {
817 let size = journal.size(section).await.expect("failed to get size");
818 assert!(size > 0, "section {section} should have data after restart");
819 }
820
821 for section in 3u64..=5 {
823 let size = journal.size(section).await.expect("failed to get size");
824 assert_eq!(size, 0, "section {section} should be gone after restart");
825 }
826
827 let item1 = journal.get(1, 0).await.expect("failed to get");
829 assert_eq!(item1, test_digest(1));
830 let item2 = journal.get(2, 0).await.expect("failed to get");
831 assert_eq!(item2, test_digest(2));
832
833 journal.destroy().await.expect("failed to destroy");
834 });
835 }
836
837 #[test_traced]
838 fn test_segmented_fixed_corruption_recovery() {
839 let executor = deterministic::Runner::default();
840 executor.start(|context| async move {
841 let cfg = test_cfg(&context);
842 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
843 .await
844 .expect("failed to init");
845
846 for i in 0u64..5 {
847 journal
848 .append(1, &test_digest(i))
849 .await
850 .expect("failed to append");
851 }
852 journal.sync_all().await.expect("failed to sync");
853 drop(journal);
854
855 let (blob, size) = context
856 .open(&cfg.partition, &1u64.to_be_bytes())
857 .await
858 .expect("failed to open blob");
859 blob.resize(size - 1).await.expect("failed to truncate");
860 blob.sync().await.expect("failed to sync");
861
862 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
863 .await
864 .expect("failed to re-init");
865
866 let count = {
867 let stream = journal
868 .replay(0, 0, NZUsize!(1024))
869 .await
870 .expect("failed to replay");
871 pin_mut!(stream);
872
873 let mut count = 0;
874 while let Some(result) = stream.next().await {
875 result.expect("should be ok");
876 count += 1;
877 }
878 count
879 };
880 assert_eq!(count, 4);
881
882 journal.destroy().await.expect("failed to destroy");
883 });
884 }
885
886 #[test_traced]
887 fn test_segmented_fixed_persistence() {
888 let executor = deterministic::Runner::default();
889 executor.start(|context| async move {
890 let cfg = test_cfg(&context);
891
892 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
894 .await
895 .expect("failed to init");
896
897 for i in 0u64..5 {
898 journal
899 .append(1, &test_digest(i))
900 .await
901 .expect("failed to append");
902 }
903 journal.sync_all().await.expect("failed to sync");
904 drop(journal);
905
906 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg)
908 .await
909 .expect("failed to re-init");
910
911 for i in 0u64..5 {
912 let item = journal.get(1, i).await.expect("failed to get");
913 assert_eq!(item, test_digest(i));
914 }
915
916 journal.destroy().await.expect("failed to destroy");
917 });
918 }
919
920 #[test_traced]
921 fn test_segmented_fixed_section_len() {
922 let executor = deterministic::Runner::default();
923 executor.start(|context| async move {
924 let cfg = test_cfg(&context);
925 let mut journal = Journal::init(context.clone(), cfg.clone())
926 .await
927 .expect("failed to init");
928
929 assert_eq!(journal.section_len(1).await.unwrap(), 0);
930
931 for i in 0u64..5 {
932 journal
933 .append(1, &test_digest(i))
934 .await
935 .expect("failed to append");
936 }
937
938 assert_eq!(journal.section_len(1).await.unwrap(), 5);
939 assert_eq!(journal.section_len(2).await.unwrap(), 0);
940
941 journal.destroy().await.expect("failed to destroy");
942 });
943 }
944
945 #[test_traced]
946 fn test_segmented_fixed_non_contiguous_sections() {
947 let executor = deterministic::Runner::default();
950 executor.start(|context| async move {
951 let cfg = test_cfg(&context);
952 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
953 .await
954 .expect("failed to init");
955
956 journal
958 .append(1, &test_digest(100))
959 .await
960 .expect("failed to append");
961 journal
962 .append(5, &test_digest(500))
963 .await
964 .expect("failed to append");
965 journal
966 .append(10, &test_digest(1000))
967 .await
968 .expect("failed to append");
969 journal.sync_all().await.expect("failed to sync");
970
971 assert_eq!(journal.get(1, 0).await.unwrap(), test_digest(100));
973 assert_eq!(journal.get(5, 0).await.unwrap(), test_digest(500));
974 assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(1000));
975
976 for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
978 let result = journal.get(missing_section, 0).await;
979 assert!(
980 matches!(result, Err(Error::SectionOutOfRange(_))),
981 "Expected SectionOutOfRange for section {}, got {:?}",
982 missing_section,
983 result
984 );
985 }
986
987 drop(journal);
989 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
990 .await
991 .expect("failed to re-init");
992
993 {
995 let stream = journal
996 .replay(0, 0, NZUsize!(1024))
997 .await
998 .expect("failed to replay");
999 pin_mut!(stream);
1000
1001 let mut items = Vec::new();
1002 while let Some(result) = stream.next().await {
1003 let (section, _, item) = result.expect("replay error");
1004 items.push((section, item));
1005 }
1006
1007 assert_eq!(items.len(), 3, "Should have 3 items");
1008 assert_eq!(items[0], (1, test_digest(100)));
1009 assert_eq!(items[1], (5, test_digest(500)));
1010 assert_eq!(items[2], (10, test_digest(1000)));
1011 }
1012
1013 {
1015 let stream = journal
1016 .replay(5, 0, NZUsize!(1024))
1017 .await
1018 .expect("failed to replay from section 5");
1019 pin_mut!(stream);
1020
1021 let mut items = Vec::new();
1022 while let Some(result) = stream.next().await {
1023 let (section, _, item) = result.expect("replay error");
1024 items.push((section, item));
1025 }
1026
1027 assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
1028 assert_eq!(items[0], (5, test_digest(500)));
1029 assert_eq!(items[1], (10, test_digest(1000)));
1030 }
1031
1032 journal.destroy().await.expect("failed to destroy");
1033 });
1034 }
1035
1036 #[test_traced]
1037 fn test_segmented_fixed_empty_section_in_middle() {
1038 let executor = deterministic::Runner::default();
1041 executor.start(|context| async move {
1042 let cfg = test_cfg(&context);
1043 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1044 .await
1045 .expect("failed to init");
1046
1047 journal
1049 .append(1, &test_digest(100))
1050 .await
1051 .expect("failed to append");
1052
1053 journal
1055 .append(2, &test_digest(200))
1056 .await
1057 .expect("failed to append");
1058 journal.sync(2).await.expect("failed to sync");
1059 journal
1060 .rewind_section(2, 0)
1061 .await
1062 .expect("failed to rewind");
1063
1064 journal
1066 .append(3, &test_digest(300))
1067 .await
1068 .expect("failed to append");
1069
1070 journal.sync_all().await.expect("failed to sync");
1071
1072 assert_eq!(journal.section_len(1).await.unwrap(), 1);
1074 assert_eq!(journal.section_len(2).await.unwrap(), 0);
1075 assert_eq!(journal.section_len(3).await.unwrap(), 1);
1076
1077 drop(journal);
1079 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1080 .await
1081 .expect("failed to re-init");
1082
1083 {
1085 let stream = journal
1086 .replay(0, 0, NZUsize!(1024))
1087 .await
1088 .expect("failed to replay");
1089 pin_mut!(stream);
1090
1091 let mut items = Vec::new();
1092 while let Some(result) = stream.next().await {
1093 let (section, _, item) = result.expect("replay error");
1094 items.push((section, item));
1095 }
1096
1097 assert_eq!(
1098 items.len(),
1099 2,
1100 "Should have 2 items (skipping empty section)"
1101 );
1102 assert_eq!(items[0], (1, test_digest(100)));
1103 assert_eq!(items[1], (3, test_digest(300)));
1104 }
1105
1106 {
1108 let stream = journal
1109 .replay(2, 0, NZUsize!(1024))
1110 .await
1111 .expect("failed to replay from section 2");
1112 pin_mut!(stream);
1113
1114 let mut items = Vec::new();
1115 while let Some(result) = stream.next().await {
1116 let (section, _, item) = result.expect("replay error");
1117 items.push((section, item));
1118 }
1119
1120 assert_eq!(items.len(), 1, "Should have 1 item from section 3");
1121 assert_eq!(items[0], (3, test_digest(300)));
1122 }
1123
1124 journal.destroy().await.expect("failed to destroy");
1125 });
1126 }
1127
1128 #[test_traced]
1129 fn test_segmented_fixed_truncation_recovery_across_page_boundary() {
1130 let executor = deterministic::Runner::default();
1142 executor.start(|context| async move {
1143 let cfg = test_cfg(&context);
1144 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1145 .await
1146 .expect("failed to init");
1147
1148 for i in 0u64..3 {
1150 journal
1151 .append(1, &test_digest(i))
1152 .await
1153 .expect("failed to append");
1154 }
1155 journal.sync_all().await.expect("failed to sync");
1156
1157 for i in 0u64..3 {
1159 let item = journal.get(1, i).await.expect("failed to get");
1160 assert_eq!(item, test_digest(i));
1161 }
1162 drop(journal);
1163
1164 let (blob, size) = context
1166 .open(&cfg.partition, &1u64.to_be_bytes())
1167 .await
1168 .expect("failed to open blob");
1169 blob.resize(size - 1).await.expect("failed to truncate");
1170 blob.sync().await.expect("failed to sync");
1171 drop(blob);
1172
1173 let journal = Journal::<_, Digest>::init(context.with_label("second"), cfg.clone())
1176 .await
1177 .expect("failed to re-init");
1178
1179 assert_eq!(journal.section_len(1).await.unwrap(), 2);
1181
1182 assert_eq!(journal.size(1).await.unwrap(), 64);
1185
1186 let item0 = journal.get(1, 0).await.expect("failed to get item 0");
1188 assert_eq!(item0, test_digest(0));
1189 let item1 = journal.get(1, 1).await.expect("failed to get item 1");
1190 assert_eq!(item1, test_digest(1));
1191
1192 let err = journal.get(1, 2).await;
1194 assert!(
1195 matches!(err, Err(Error::ItemOutOfRange(2))),
1196 "expected ItemOutOfRange(2), got {:?}",
1197 err
1198 );
1199
1200 journal.destroy().await.expect("failed to destroy");
1201 });
1202 }
1203
1204 #[test_traced]
1205 fn test_journal_clear() {
1206 let executor = deterministic::Runner::default();
1207 executor.start(|context| async move {
1208 let cfg = Config {
1209 partition: "clear-test".into(),
1210 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1211 write_buffer: NZUsize!(1024),
1212 };
1213
1214 let mut journal: Journal<_, Digest> =
1215 Journal::init(context.with_label("journal"), cfg.clone())
1216 .await
1217 .expect("Failed to initialize journal");
1218
1219 for section in 0..5u64 {
1221 for i in 0..10u64 {
1222 journal
1223 .append(section, &test_digest(section * 1000 + i))
1224 .await
1225 .expect("Failed to append");
1226 }
1227 journal.sync(section).await.expect("Failed to sync");
1228 }
1229
1230 assert_eq!(journal.get(0, 0).await.unwrap(), test_digest(0));
1232 assert_eq!(journal.get(4, 0).await.unwrap(), test_digest(4000));
1233
1234 journal.clear().await.expect("Failed to clear");
1236
1237 for section in 0..5u64 {
1239 assert!(matches!(
1240 journal.get(section, 0).await,
1241 Err(Error::SectionOutOfRange(s)) if s == section
1242 ));
1243 }
1244
1245 for i in 0..5u64 {
1247 journal
1248 .append(10, &test_digest(i * 100))
1249 .await
1250 .expect("Failed to append after clear");
1251 }
1252 journal.sync(10).await.expect("Failed to sync after clear");
1253
1254 assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(0));
1256
1257 assert!(matches!(
1259 journal.get(0, 0).await,
1260 Err(Error::SectionOutOfRange(0))
1261 ));
1262
1263 journal.destroy().await.unwrap();
1264 });
1265 }
1266
1267 #[test_traced]
1268 fn test_last_missing_section_returns_error() {
1269 let executor = deterministic::Runner::default();
1270 executor.start(|context| async move {
1271 let cfg = test_cfg(&context);
1272 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1273 .await
1274 .expect("failed to init");
1275
1276 assert!(matches!(
1277 journal.last(0).await,
1278 Err(Error::SectionOutOfRange(0))
1279 ));
1280 assert!(matches!(
1281 journal.last(99).await,
1282 Err(Error::SectionOutOfRange(99))
1283 ));
1284
1285 journal.destroy().await.unwrap();
1286 });
1287 }
1288
1289 #[test_traced]
1290 fn test_last_after_rewind_to_zero() {
1291 let executor = deterministic::Runner::default();
1292 executor.start(|context| async move {
1293 let cfg = test_cfg(&context);
1294 let mut journal = Journal::init(context.clone(), cfg.clone())
1295 .await
1296 .expect("failed to init");
1297
1298 journal.append(0, &test_digest(0)).await.unwrap();
1299 journal.append(0, &test_digest(1)).await.unwrap();
1300 journal.sync(0).await.unwrap();
1301
1302 assert!(journal.last(0).await.unwrap().is_some());
1303
1304 journal.rewind(0, 0).await.unwrap();
1305 assert_eq!(journal.last(0).await.unwrap(), None);
1306
1307 journal.destroy().await.unwrap();
1308 });
1309 }
1310
1311 #[test_traced]
1312 fn test_last_pruned_section_returns_error() {
1313 let executor = deterministic::Runner::default();
1314 executor.start(|context| async move {
1315 let cfg = test_cfg(&context);
1316 let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1317 .await
1318 .expect("failed to init");
1319
1320 journal.append(0, &test_digest(0)).await.unwrap();
1321 journal.append(1, &test_digest(1)).await.unwrap();
1322 journal.sync_all().await.unwrap();
1323
1324 journal.prune(1).await.unwrap();
1325
1326 assert!(matches!(
1327 journal.last(0).await,
1328 Err(Error::AlreadyPrunedToSection(1))
1329 ));
1330 assert!(journal.last(1).await.unwrap().is_some());
1331
1332 journal.destroy().await.unwrap();
1333 });
1334 }
1335}