1use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
24use crate::journal::Error;
25use commonware_codec::{CodecFixed, CodecFixedShared, DecodeExt as _, ReadExt as _};
26use commonware_runtime::{
27 buffer::paged::{CacheRef, Replay},
28 Blob, Buf, Metrics, Storage,
29};
30use futures::{
31 stream::{self, Stream},
32 StreamExt,
33};
34use std::{marker::PhantomData, num::NonZeroUsize};
35use tracing::{trace, warn};
36
37struct ReplayState<B: Blob> {
39 section: u64,
40 replay: Replay<B>,
41 position: u64,
42 done: bool,
43}
44
45#[derive(Clone)]
47pub struct Config {
48 pub partition: String,
50
51 pub page_cache: CacheRef,
53
54 pub write_buffer: NonZeroUsize,
56}
57
58pub struct Journal<E: Storage + Metrics, A: CodecFixed> {
72 manager: Manager<E, AppendFactory>,
73 _array: PhantomData<A>,
74}
75
76impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
77 pub const CHUNK_SIZE: usize = A::SIZE;
79 const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
80
81 pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
86 let manager_cfg = ManagerConfig {
87 partition: cfg.partition,
88 factory: AppendFactory {
89 write_buffer: cfg.write_buffer,
90 page_cache_ref: cfg.page_cache,
91 },
92 };
93 let mut manager = Manager::init(context, manager_cfg).await?;
94
95 let sections: Vec<_> = manager.sections().collect();
97 for section in sections {
98 let size = manager.size(section).await?;
99 if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
100 let valid_size = size - (size % Self::CHUNK_SIZE_U64);
101 warn!(
102 section,
103 invalid_size = size,
104 new_size = valid_size,
105 "trailing bytes detected: truncating"
106 );
107 manager.rewind_section(section, valid_size).await?;
108 }
109 }
110
111 Ok(Self {
112 manager,
113 _array: PhantomData,
114 })
115 }
116
117 pub async fn append(&mut self, section: u64, item: &A) -> Result<u64, Error> {
121 let blob = self.manager.get_or_create(section).await?;
122
123 let size = blob.size().await;
124 if !size.is_multiple_of(Self::CHUNK_SIZE_U64) {
125 return Err(Error::InvalidBlobSize(section, size));
126 }
127 let position = size / Self::CHUNK_SIZE_U64;
128
129 let buf = item.encode_mut();
131 blob.append(&buf).await?;
132 trace!(section, position, "appended item");
133
134 Ok(position)
135 }
136
137 pub(crate) async fn append_raw(&mut self, section: u64, buf: &[u8]) -> Result<(), Error> {
145 assert!(!buf.is_empty());
146 assert!(buf.len().is_multiple_of(Self::CHUNK_SIZE));
147 let blob = self.manager.get_or_create(section).await?;
148 blob.append(buf).await?;
149 trace!(
150 section,
151 count = buf.len() / Self::CHUNK_SIZE,
152 "appended items"
153 );
154 Ok(())
155 }
156
157 pub async fn get(&self, section: u64, position: u64) -> Result<A, Error> {
165 let blob = self
166 .manager
167 .get(section)?
168 .ok_or(Error::SectionOutOfRange(section))?;
169
170 let offset = position
171 .checked_mul(Self::CHUNK_SIZE_U64)
172 .ok_or(Error::ItemOutOfRange(position))?;
173 let end = offset
174 .checked_add(Self::CHUNK_SIZE_U64)
175 .ok_or(Error::ItemOutOfRange(position))?;
176 if end > blob.size().await {
177 return Err(Error::ItemOutOfRange(position));
178 }
179
180 let buf = blob.read_at(offset, Self::CHUNK_SIZE).await?;
181 A::decode(buf.coalesce()).map_err(Error::Codec)
182 }
183
184 pub async fn get_many(
189 &self,
190 section: u64,
191 positions: &[u64],
192 buf: &mut [u8],
193 ) -> Result<Vec<A>, Error> {
194 if positions.is_empty() {
195 return Ok(Vec::new());
196 }
197 let blob = self
198 .manager
199 .get(section)?
200 .ok_or(Error::SectionOutOfRange(section))?;
201
202 let offsets: Vec<u64> = positions
203 .iter()
204 .map(|&p| {
205 p.checked_mul(Self::CHUNK_SIZE_U64)
206 .ok_or(Error::ItemOutOfRange(p))
207 })
208 .collect::<Result<_, _>>()?;
209
210 blob.read_many_into(buf, &offsets, Self::CHUNK_SIZE).await?;
211
212 let mut items = Vec::with_capacity(positions.len());
213 for i in 0..positions.len() {
214 let slice = &buf[i * Self::CHUNK_SIZE..(i + 1) * Self::CHUNK_SIZE];
215 items.push(A::decode(slice).map_err(Error::Codec)?);
216 }
217 Ok(items)
218 }
219
220 pub fn try_get_sync(&self, section: u64, position: u64) -> Option<A> {
222 let mut buf = vec![0u8; Self::CHUNK_SIZE];
223 self.try_get_sync_into(section, position, &mut buf)
224 }
225
226 pub fn try_get_sync_into(&self, section: u64, position: u64, buf: &mut [u8]) -> Option<A> {
234 assert!(
235 buf.len() >= Self::CHUNK_SIZE,
236 "try_get_sync_into requires buf.len() >= CHUNK_SIZE"
237 );
238 let blob = self.manager.get(section).ok()??;
239 let offset = position.checked_mul(Self::CHUNK_SIZE_U64)?;
240 let remaining = blob.try_size()?.checked_sub(offset)?;
241 if remaining < Self::CHUNK_SIZE_U64 {
242 return None;
243 }
244 let buf = &mut buf[..Self::CHUNK_SIZE];
245 if !blob.try_read_sync(offset, buf) {
246 return None;
247 }
248 A::decode(&buf[..]).ok()
249 }
250
251 pub async fn last(&self, section: u64) -> Result<Option<A>, Error> {
260 let blob = self
261 .manager
262 .get(section)?
263 .ok_or(Error::SectionOutOfRange(section))?;
264
265 let size = blob.size().await;
266 if size < Self::CHUNK_SIZE_U64 {
267 return Ok(None);
268 }
269
270 let last_position = (size / Self::CHUNK_SIZE_U64) - 1;
271 let offset = last_position * Self::CHUNK_SIZE_U64;
272 let buf = blob.read_at(offset, Self::CHUNK_SIZE).await?;
273 A::decode(buf.coalesce()).map_err(Error::Codec).map(Some)
274 }
275
276 pub async fn replay(
280 &self,
281 start_section: u64,
282 start_position: u64,
283 buffer: NonZeroUsize,
284 ) -> Result<impl Stream<Item = Result<(u64, u64, A), Error>> + Send + '_, Error> {
285 let mut blob_info = Vec::new();
287 for (§ion, blob) in self.manager.sections_from(start_section) {
288 let blob_size = blob.size().await;
289 let mut replay = blob.replay(buffer).await?;
290 let initial_position = if section == start_section {
292 let start = start_position * Self::CHUNK_SIZE_U64;
293 if start > blob_size {
294 return Err(Error::ItemOutOfRange(start_position));
295 }
296 replay.seek_to(start)?;
297 start_position
298 } else {
299 0
300 };
301 blob_info.push((section, replay, initial_position));
302 }
303
304 Ok(
308 stream::iter(blob_info).flat_map(move |(section, replay, initial_position)| {
309 stream::unfold(
310 ReplayState {
311 section,
312 replay,
313 position: initial_position,
314 done: false,
315 },
316 move |mut state| async move {
317 if state.done {
318 return None;
319 }
320
321 let mut batch: Vec<Result<(u64, u64, A), Error>> = Vec::new();
322 loop {
323 match state.replay.ensure(Self::CHUNK_SIZE).await {
325 Ok(true) => {}
326 Ok(false) => {
327 state.done = true;
329 return if batch.is_empty() {
330 None
331 } else {
332 Some((batch, state))
333 };
334 }
335 Err(err) => {
336 batch.push(Err(Error::Runtime(err)));
337 state.done = true;
338 return Some((batch, state));
339 }
340 }
341
342 while state.replay.remaining() >= Self::CHUNK_SIZE {
344 match A::read(&mut state.replay) {
345 Ok(item) => {
346 batch.push(Ok((state.section, state.position, item)));
347 state.position += 1;
348 }
349 Err(err) => {
350 batch.push(Err(Error::Codec(err)));
351 state.done = true;
352 return Some((batch, state));
353 }
354 }
355 }
356
357 if !batch.is_empty() {
359 return Some((batch, state));
360 }
361 }
362 },
363 )
364 .flat_map(stream::iter)
365 }),
366 )
367 }
368
369 pub async fn sync(&self, section: u64) -> Result<(), Error> {
371 self.manager.sync(section).await
372 }
373
374 pub async fn sync_all(&self) -> Result<(), Error> {
376 self.manager.sync_all().await
377 }
378
379 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
381 self.manager.prune(min).await
382 }
383
384 pub fn oldest_section(&self) -> Option<u64> {
386 self.manager.oldest_section()
387 }
388
389 pub fn newest_section(&self) -> Option<u64> {
391 self.manager.newest_section()
392 }
393
394 pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
396 self.manager.sections_from(0).map(|(section, _)| *section)
397 }
398
399 pub async fn section_len(&self, section: u64) -> Result<u64, Error> {
401 let size = self.manager.size(section).await?;
402 Ok(size / Self::CHUNK_SIZE_U64)
403 }
404
405 pub async fn size(&self, section: u64) -> Result<u64, Error> {
407 self.manager.size(section).await
408 }
409
410 pub async fn rewind(&mut self, section: u64, offset: u64) -> Result<(), Error> {
415 self.manager.rewind(section, offset).await
416 }
417
418 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
422 self.manager.rewind_section(section, size).await
423 }
424
425 pub async fn destroy(self) -> Result<(), Error> {
427 self.manager.destroy().await
428 }
429
430 pub async fn clear(&mut self) -> Result<(), Error> {
434 self.manager.clear().await
435 }
436
437 pub(crate) async fn ensure_section_exists(&mut self, section: u64) -> Result<(), Error> {
442 self.manager.get_or_create(section).await?;
443 Ok(())
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
451 use commonware_macros::test_traced;
452 use commonware_runtime::{
453 buffer::paged::CacheRef, deterministic, BufferPooler, Runner, Supervisor as _,
454 };
455 use commonware_utils::{NZUsize, NZU16};
456 use core::num::NonZeroU16;
457 use futures::{pin_mut, StreamExt};
458
459 const PAGE_SIZE: NonZeroU16 = NZU16!(44);
460 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
461
462 fn test_digest(value: u64) -> Digest {
463 Sha256::hash(&value.to_be_bytes())
464 }
465
466 fn test_cfg(pooler: &impl BufferPooler) -> Config {
467 Config {
468 partition: "test-partition".into(),
469 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
470 write_buffer: NZUsize!(2048),
471 }
472 }
473
474 #[test_traced]
475 fn test_segmented_fixed_append_and_get() {
476 let executor = deterministic::Runner::default();
477 executor.start(|context| async move {
478 let cfg = test_cfg(&context);
479 let mut journal = Journal::init(context.child("storage"), cfg.clone())
480 .await
481 .expect("failed to init");
482
483 let pos0 = journal
484 .append(1, &test_digest(0))
485 .await
486 .expect("failed to append");
487 assert_eq!(pos0, 0);
488
489 let pos1 = journal
490 .append(1, &test_digest(1))
491 .await
492 .expect("failed to append");
493 assert_eq!(pos1, 1);
494
495 let pos2 = journal
496 .append(2, &test_digest(2))
497 .await
498 .expect("failed to append");
499 assert_eq!(pos2, 0);
500
501 let item0 = journal.get(1, 0).await.expect("failed to get");
502 assert_eq!(item0, test_digest(0));
503
504 let item1 = journal.get(1, 1).await.expect("failed to get");
505 assert_eq!(item1, test_digest(1));
506
507 let item2 = journal.get(2, 0).await.expect("failed to get");
508 assert_eq!(item2, test_digest(2));
509
510 let err = journal.get(1, 2).await;
511 assert!(matches!(err, Err(Error::ItemOutOfRange(2))));
512
513 let err = journal.get(3, 0).await;
514 assert!(matches!(err, Err(Error::SectionOutOfRange(3))));
515
516 journal.destroy().await.expect("failed to destroy");
517 });
518 }
519
520 #[test_traced]
521 fn test_segmented_fixed_replay() {
522 let executor = deterministic::Runner::default();
523 executor.start(|context| async move {
524 let cfg = test_cfg(&context);
525 let mut journal = Journal::init(context.child("first"), cfg.clone())
526 .await
527 .expect("failed to init");
528
529 for i in 0u64..10 {
530 journal
531 .append(1, &test_digest(i))
532 .await
533 .expect("failed to append");
534 }
535 for i in 10u64..20 {
536 journal
537 .append(2, &test_digest(i))
538 .await
539 .expect("failed to append");
540 }
541
542 journal.sync_all().await.expect("failed to sync");
543 drop(journal);
544
545 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
546 .await
547 .expect("failed to re-init");
548
549 let items = {
550 let stream = journal
551 .replay(0, 0, NZUsize!(1024))
552 .await
553 .expect("failed to replay");
554 pin_mut!(stream);
555
556 let mut items = Vec::new();
557 while let Some(result) = stream.next().await {
558 match result {
559 Ok((section, pos, item)) => items.push((section, pos, item)),
560 Err(err) => panic!("replay error: {err}"),
561 }
562 }
563 items
564 };
565
566 assert_eq!(items.len(), 20);
567 for (i, item) in items.iter().enumerate().take(10) {
568 assert_eq!(item.0, 1);
569 assert_eq!(item.1, i as u64);
570 assert_eq!(item.2, test_digest(i as u64));
571 }
572 for (i, item) in items.iter().enumerate().skip(10).take(10) {
573 assert_eq!(item.0, 2);
574 assert_eq!(item.1, (i - 10) as u64);
575 assert_eq!(item.2, test_digest(i as u64));
576 }
577
578 journal.destroy().await.expect("failed to destroy");
579 });
580 }
581
582 #[test_traced]
583 fn test_segmented_fixed_replay_with_start_offset() {
584 let executor = deterministic::Runner::default();
586 executor.start(|context| async move {
587 let cfg = test_cfg(&context);
588 let mut journal = Journal::init(context.child("first"), cfg.clone())
589 .await
590 .expect("failed to init");
591
592 for i in 0u64..10 {
594 journal
595 .append(1, &test_digest(i))
596 .await
597 .expect("failed to append");
598 }
599 for i in 10u64..15 {
601 journal
602 .append(2, &test_digest(i))
603 .await
604 .expect("failed to append");
605 }
606 journal.sync_all().await.expect("failed to sync");
607 drop(journal);
608
609 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
610 .await
611 .expect("failed to re-init");
612
613 {
615 let stream = journal
616 .replay(1, 5, NZUsize!(1024))
617 .await
618 .expect("failed to replay");
619 pin_mut!(stream);
620
621 let mut items = Vec::new();
622 while let Some(result) = stream.next().await {
623 let (section, pos, item) = result.expect("replay error");
624 items.push((section, pos, item));
625 }
626
627 assert_eq!(
628 items.len(),
629 10,
630 "Should have 5 items from section 1 + 5 from section 2"
631 );
632
633 for (i, (section, pos, item)) in items.iter().enumerate().take(5) {
635 assert_eq!(*section, 1);
636 assert_eq!(*pos, (i + 5) as u64);
637 assert_eq!(*item, test_digest((i + 5) as u64));
638 }
639
640 for (i, (section, pos, item)) in items.iter().enumerate().skip(5) {
642 assert_eq!(*section, 2);
643 assert_eq!(*pos, (i - 5) as u64);
644 assert_eq!(*item, test_digest((i + 5) as u64));
645 }
646 }
647
648 {
650 let stream = journal
651 .replay(1, 9, NZUsize!(1024))
652 .await
653 .expect("failed to replay");
654 pin_mut!(stream);
655
656 let mut items = Vec::new();
657 while let Some(result) = stream.next().await {
658 let (section, pos, item) = result.expect("replay error");
659 items.push((section, pos, item));
660 }
661
662 assert_eq!(
663 items.len(),
664 6,
665 "Should have 1 item from section 1 + 5 from section 2"
666 );
667 assert_eq!(items[0], (1, 9, test_digest(9)));
668 for (i, (section, pos, item)) in items.iter().enumerate().skip(1) {
669 assert_eq!(*section, 2);
670 assert_eq!(*pos, (i - 1) as u64);
671 assert_eq!(*item, test_digest((i + 9) as u64));
672 }
673 }
674
675 {
677 let stream = journal
678 .replay(2, 3, NZUsize!(1024))
679 .await
680 .expect("failed to replay");
681 pin_mut!(stream);
682
683 let mut items = Vec::new();
684 while let Some(result) = stream.next().await {
685 let (section, pos, item) = result.expect("replay error");
686 items.push((section, pos, item));
687 }
688
689 assert_eq!(items.len(), 2, "Should have 2 items from section 2");
690 assert_eq!(items[0], (2, 3, test_digest(13)));
691 assert_eq!(items[1], (2, 4, test_digest(14)));
692 }
693
694 let result = journal.replay(1, 100, NZUsize!(1024)).await;
696 assert!(matches!(result, Err(Error::ItemOutOfRange(100))));
697 drop(result);
698
699 journal.destroy().await.expect("failed to destroy");
700 });
701 }
702
703 #[test_traced]
704 fn test_segmented_fixed_prune() {
705 let executor = deterministic::Runner::default();
706 executor.start(|context| async move {
707 let cfg = test_cfg(&context);
708 let mut journal = Journal::init(context.child("storage"), cfg.clone())
709 .await
710 .expect("failed to init");
711
712 for section in 1u64..=5 {
713 journal
714 .append(section, &test_digest(section))
715 .await
716 .expect("failed to append");
717 }
718 journal.sync_all().await.expect("failed to sync");
719
720 journal.prune(3).await.expect("failed to prune");
721
722 let err = journal.get(1, 0).await;
723 assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
724
725 let err = journal.get(2, 0).await;
726 assert!(matches!(err, Err(Error::AlreadyPrunedToSection(3))));
727
728 let item = journal.get(3, 0).await.expect("should exist");
729 assert_eq!(item, test_digest(3));
730
731 journal.destroy().await.expect("failed to destroy");
732 });
733 }
734
735 #[test_traced]
736 fn test_segmented_fixed_rewind() {
737 let executor = deterministic::Runner::default();
738 executor.start(|context| async move {
739 let cfg = test_cfg(&context);
740 let mut journal = Journal::init(context.child("storage"), cfg.clone())
741 .await
742 .expect("failed to init");
743
744 for section in 1u64..=3 {
746 journal
747 .append(section, &test_digest(section))
748 .await
749 .expect("failed to append");
750 }
751 journal.sync_all().await.expect("failed to sync");
752
753 for section in 1u64..=3 {
755 let size = journal.size(section).await.expect("failed to get size");
756 assert!(size > 0, "section {section} should have data");
757 }
758
759 let size = journal.size(1).await.expect("failed to get size");
761 journal.rewind(1, size).await.expect("failed to rewind");
762
763 let size = journal.size(1).await.expect("failed to get size");
765 assert!(size > 0, "section 1 should still have data");
766
767 for section in 2u64..=3 {
769 let size = journal.size(section).await.expect("failed to get size");
770 assert_eq!(size, 0, "section {section} should be removed");
771 }
772
773 let item = journal.get(1, 0).await.expect("failed to get");
775 assert_eq!(item, test_digest(1));
776
777 journal.destroy().await.expect("failed to destroy");
778 });
779 }
780
781 #[test_traced]
782 fn test_segmented_fixed_rewind_many_sections() {
783 let executor = deterministic::Runner::default();
784 executor.start(|context| async move {
785 let cfg = test_cfg(&context);
786 let mut journal = Journal::init(context.child("storage"), cfg.clone())
787 .await
788 .expect("failed to init");
789
790 for section in 1u64..=10 {
792 journal
793 .append(section, &test_digest(section))
794 .await
795 .expect("failed to append");
796 }
797 journal.sync_all().await.expect("failed to sync");
798
799 let size = journal.size(5).await.expect("failed to get size");
801 journal.rewind(5, size).await.expect("failed to rewind");
802
803 for section in 1u64..=5 {
805 let size = journal.size(section).await.expect("failed to get size");
806 assert!(size > 0, "section {section} should still have data");
807 }
808
809 for section in 6u64..=10 {
811 let size = journal.size(section).await.expect("failed to get size");
812 assert_eq!(size, 0, "section {section} should be removed");
813 }
814
815 {
817 let stream = journal
818 .replay(0, 0, NZUsize!(1024))
819 .await
820 .expect("failed to replay");
821 pin_mut!(stream);
822 let mut items = Vec::new();
823 while let Some(result) = stream.next().await {
824 let (section, _, item) = result.expect("failed to read");
825 items.push((section, item));
826 }
827 assert_eq!(items.len(), 5);
828 for (i, (section, item)) in items.iter().enumerate() {
829 assert_eq!(*section, (i + 1) as u64);
830 assert_eq!(*item, test_digest((i + 1) as u64));
831 }
832 }
833
834 journal.destroy().await.expect("failed to destroy");
835 });
836 }
837
838 #[test_traced]
839 fn test_segmented_fixed_rewind_persistence() {
840 let executor = deterministic::Runner::default();
841 executor.start(|context| async move {
842 let cfg = test_cfg(&context);
843
844 let mut journal = Journal::init(context.child("first"), cfg.clone())
846 .await
847 .expect("failed to init");
848 for section in 1u64..=5 {
849 journal
850 .append(section, &test_digest(section))
851 .await
852 .expect("failed to append");
853 }
854 journal.sync_all().await.expect("failed to sync");
855
856 let size = journal.size(2).await.expect("failed to get size");
858 journal.rewind(2, size).await.expect("failed to rewind");
859 journal.sync_all().await.expect("failed to sync");
860 drop(journal);
861
862 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
864 .await
865 .expect("failed to re-init");
866
867 for section in 1u64..=2 {
869 let size = journal.size(section).await.expect("failed to get size");
870 assert!(size > 0, "section {section} should have data after restart");
871 }
872
873 for section in 3u64..=5 {
875 let size = journal.size(section).await.expect("failed to get size");
876 assert_eq!(size, 0, "section {section} should be gone after restart");
877 }
878
879 let item1 = journal.get(1, 0).await.expect("failed to get");
881 assert_eq!(item1, test_digest(1));
882 let item2 = journal.get(2, 0).await.expect("failed to get");
883 assert_eq!(item2, test_digest(2));
884
885 journal.destroy().await.expect("failed to destroy");
886 });
887 }
888
889 #[test_traced]
890 fn test_segmented_fixed_corruption_recovery() {
891 let executor = deterministic::Runner::default();
892 executor.start(|context| async move {
893 let cfg = test_cfg(&context);
894 let mut journal = Journal::init(context.child("first"), cfg.clone())
895 .await
896 .expect("failed to init");
897
898 for i in 0u64..5 {
899 journal
900 .append(1, &test_digest(i))
901 .await
902 .expect("failed to append");
903 }
904 journal.sync_all().await.expect("failed to sync");
905 drop(journal);
906
907 let (blob, size) = context
908 .open(&cfg.partition, &1u64.to_be_bytes())
909 .await
910 .expect("failed to open blob");
911 blob.resize(size - 1).await.expect("failed to truncate");
912 blob.sync().await.expect("failed to sync");
913
914 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
915 .await
916 .expect("failed to re-init");
917
918 let count = {
919 let stream = journal
920 .replay(0, 0, NZUsize!(1024))
921 .await
922 .expect("failed to replay");
923 pin_mut!(stream);
924
925 let mut count = 0;
926 while let Some(result) = stream.next().await {
927 result.expect("should be ok");
928 count += 1;
929 }
930 count
931 };
932 assert_eq!(count, 4);
933
934 journal.destroy().await.expect("failed to destroy");
935 });
936 }
937
938 #[test_traced]
939 fn test_segmented_fixed_persistence() {
940 let executor = deterministic::Runner::default();
941 executor.start(|context| async move {
942 let cfg = test_cfg(&context);
943
944 let mut journal = Journal::init(context.child("first"), cfg.clone())
946 .await
947 .expect("failed to init");
948
949 for i in 0u64..5 {
950 journal
951 .append(1, &test_digest(i))
952 .await
953 .expect("failed to append");
954 }
955 journal.sync_all().await.expect("failed to sync");
956 drop(journal);
957
958 let journal = Journal::<_, Digest>::init(context.child("second"), cfg)
960 .await
961 .expect("failed to re-init");
962
963 for i in 0u64..5 {
964 let item = journal.get(1, i).await.expect("failed to get");
965 assert_eq!(item, test_digest(i));
966 }
967
968 journal.destroy().await.expect("failed to destroy");
969 });
970 }
971
972 #[test_traced]
973 fn test_segmented_fixed_section_len() {
974 let executor = deterministic::Runner::default();
975 executor.start(|context| async move {
976 let cfg = test_cfg(&context);
977 let mut journal = Journal::init(context.child("storage"), cfg.clone())
978 .await
979 .expect("failed to init");
980
981 assert_eq!(journal.section_len(1).await.unwrap(), 0);
982
983 for i in 0u64..5 {
984 journal
985 .append(1, &test_digest(i))
986 .await
987 .expect("failed to append");
988 }
989
990 assert_eq!(journal.section_len(1).await.unwrap(), 5);
991 assert_eq!(journal.section_len(2).await.unwrap(), 0);
992
993 journal.destroy().await.expect("failed to destroy");
994 });
995 }
996
997 #[test_traced]
998 fn test_segmented_fixed_non_contiguous_sections() {
999 let executor = deterministic::Runner::default();
1002 executor.start(|context| async move {
1003 let cfg = test_cfg(&context);
1004 let mut journal = Journal::init(context.child("first"), cfg.clone())
1005 .await
1006 .expect("failed to init");
1007
1008 journal
1010 .append(1, &test_digest(100))
1011 .await
1012 .expect("failed to append");
1013 journal
1014 .append(5, &test_digest(500))
1015 .await
1016 .expect("failed to append");
1017 journal
1018 .append(10, &test_digest(1000))
1019 .await
1020 .expect("failed to append");
1021 journal.sync_all().await.expect("failed to sync");
1022
1023 assert_eq!(journal.get(1, 0).await.unwrap(), test_digest(100));
1025 assert_eq!(journal.get(5, 0).await.unwrap(), test_digest(500));
1026 assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(1000));
1027
1028 for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
1030 let result = journal.get(missing_section, 0).await;
1031 assert!(
1032 matches!(result, Err(Error::SectionOutOfRange(_))),
1033 "Expected SectionOutOfRange for section {}, got {:?}",
1034 missing_section,
1035 result
1036 );
1037 }
1038
1039 drop(journal);
1041 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1042 .await
1043 .expect("failed to re-init");
1044
1045 {
1047 let stream = journal
1048 .replay(0, 0, NZUsize!(1024))
1049 .await
1050 .expect("failed to replay");
1051 pin_mut!(stream);
1052
1053 let mut items = Vec::new();
1054 while let Some(result) = stream.next().await {
1055 let (section, _, item) = result.expect("replay error");
1056 items.push((section, item));
1057 }
1058
1059 assert_eq!(items.len(), 3, "Should have 3 items");
1060 assert_eq!(items[0], (1, test_digest(100)));
1061 assert_eq!(items[1], (5, test_digest(500)));
1062 assert_eq!(items[2], (10, test_digest(1000)));
1063 }
1064
1065 {
1067 let stream = journal
1068 .replay(5, 0, NZUsize!(1024))
1069 .await
1070 .expect("failed to replay from section 5");
1071 pin_mut!(stream);
1072
1073 let mut items = Vec::new();
1074 while let Some(result) = stream.next().await {
1075 let (section, _, item) = result.expect("replay error");
1076 items.push((section, item));
1077 }
1078
1079 assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
1080 assert_eq!(items[0], (5, test_digest(500)));
1081 assert_eq!(items[1], (10, test_digest(1000)));
1082 }
1083
1084 journal.destroy().await.expect("failed to destroy");
1085 });
1086 }
1087
1088 #[test_traced]
1089 fn test_segmented_fixed_empty_section_in_middle() {
1090 let executor = deterministic::Runner::default();
1093 executor.start(|context| async move {
1094 let cfg = test_cfg(&context);
1095 let mut journal = Journal::init(context.child("first"), cfg.clone())
1096 .await
1097 .expect("failed to init");
1098
1099 journal
1101 .append(1, &test_digest(100))
1102 .await
1103 .expect("failed to append");
1104
1105 journal
1107 .append(2, &test_digest(200))
1108 .await
1109 .expect("failed to append");
1110 journal.sync(2).await.expect("failed to sync");
1111 journal
1112 .rewind_section(2, 0)
1113 .await
1114 .expect("failed to rewind");
1115
1116 journal
1118 .append(3, &test_digest(300))
1119 .await
1120 .expect("failed to append");
1121
1122 journal.sync_all().await.expect("failed to sync");
1123
1124 assert_eq!(journal.section_len(1).await.unwrap(), 1);
1126 assert_eq!(journal.section_len(2).await.unwrap(), 0);
1127 assert_eq!(journal.section_len(3).await.unwrap(), 1);
1128
1129 drop(journal);
1131 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1132 .await
1133 .expect("failed to re-init");
1134
1135 {
1137 let stream = journal
1138 .replay(0, 0, NZUsize!(1024))
1139 .await
1140 .expect("failed to replay");
1141 pin_mut!(stream);
1142
1143 let mut items = Vec::new();
1144 while let Some(result) = stream.next().await {
1145 let (section, _, item) = result.expect("replay error");
1146 items.push((section, item));
1147 }
1148
1149 assert_eq!(
1150 items.len(),
1151 2,
1152 "Should have 2 items (skipping empty section)"
1153 );
1154 assert_eq!(items[0], (1, test_digest(100)));
1155 assert_eq!(items[1], (3, test_digest(300)));
1156 }
1157
1158 {
1160 let stream = journal
1161 .replay(2, 0, NZUsize!(1024))
1162 .await
1163 .expect("failed to replay from section 2");
1164 pin_mut!(stream);
1165
1166 let mut items = Vec::new();
1167 while let Some(result) = stream.next().await {
1168 let (section, _, item) = result.expect("replay error");
1169 items.push((section, item));
1170 }
1171
1172 assert_eq!(items.len(), 1, "Should have 1 item from section 3");
1173 assert_eq!(items[0], (3, test_digest(300)));
1174 }
1175
1176 journal.destroy().await.expect("failed to destroy");
1177 });
1178 }
1179
1180 #[test_traced]
1181 fn test_segmented_fixed_truncation_recovery_across_page_boundary() {
1182 let executor = deterministic::Runner::default();
1194 executor.start(|context| async move {
1195 let cfg = test_cfg(&context);
1196 let mut journal = Journal::init(context.child("first"), cfg.clone())
1197 .await
1198 .expect("failed to init");
1199
1200 for i in 0u64..3 {
1202 journal
1203 .append(1, &test_digest(i))
1204 .await
1205 .expect("failed to append");
1206 }
1207 journal.sync_all().await.expect("failed to sync");
1208
1209 for i in 0u64..3 {
1211 let item = journal.get(1, i).await.expect("failed to get");
1212 assert_eq!(item, test_digest(i));
1213 }
1214 drop(journal);
1215
1216 let (blob, size) = context
1218 .open(&cfg.partition, &1u64.to_be_bytes())
1219 .await
1220 .expect("failed to open blob");
1221 blob.resize(size - 1).await.expect("failed to truncate");
1222 blob.sync().await.expect("failed to sync");
1223 drop(blob);
1224
1225 let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
1228 .await
1229 .expect("failed to re-init");
1230
1231 assert_eq!(journal.section_len(1).await.unwrap(), 2);
1233
1234 assert_eq!(journal.size(1).await.unwrap(), 64);
1237
1238 let item0 = journal.get(1, 0).await.expect("failed to get item 0");
1240 assert_eq!(item0, test_digest(0));
1241 let item1 = journal.get(1, 1).await.expect("failed to get item 1");
1242 assert_eq!(item1, test_digest(1));
1243
1244 let err = journal.get(1, 2).await;
1246 assert!(
1247 matches!(err, Err(Error::ItemOutOfRange(2))),
1248 "expected ItemOutOfRange(2), got {:?}",
1249 err
1250 );
1251
1252 journal.destroy().await.expect("failed to destroy");
1253 });
1254 }
1255
1256 #[test_traced]
1257 fn test_journal_clear() {
1258 let executor = deterministic::Runner::default();
1259 executor.start(|context| async move {
1260 let cfg = Config {
1261 partition: "clear-test".into(),
1262 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1263 write_buffer: NZUsize!(1024),
1264 };
1265
1266 let mut journal: Journal<_, Digest> =
1267 Journal::init(context.child("journal"), cfg.clone())
1268 .await
1269 .expect("Failed to initialize journal");
1270
1271 for section in 0..5u64 {
1273 for i in 0..10u64 {
1274 journal
1275 .append(section, &test_digest(section * 1000 + i))
1276 .await
1277 .expect("Failed to append");
1278 }
1279 journal.sync(section).await.expect("Failed to sync");
1280 }
1281
1282 assert_eq!(journal.get(0, 0).await.unwrap(), test_digest(0));
1284 assert_eq!(journal.get(4, 0).await.unwrap(), test_digest(4000));
1285
1286 journal.clear().await.expect("Failed to clear");
1288
1289 for section in 0..5u64 {
1291 assert!(matches!(
1292 journal.get(section, 0).await,
1293 Err(Error::SectionOutOfRange(s)) if s == section
1294 ));
1295 }
1296
1297 for i in 0..5u64 {
1299 journal
1300 .append(10, &test_digest(i * 100))
1301 .await
1302 .expect("Failed to append after clear");
1303 }
1304 journal.sync(10).await.expect("Failed to sync after clear");
1305
1306 assert_eq!(journal.get(10, 0).await.unwrap(), test_digest(0));
1308
1309 assert!(matches!(
1311 journal.get(0, 0).await,
1312 Err(Error::SectionOutOfRange(0))
1313 ));
1314
1315 journal.destroy().await.unwrap();
1316 });
1317 }
1318
1319 #[test_traced]
1320 fn test_last_missing_section_returns_error() {
1321 let executor = deterministic::Runner::default();
1322 executor.start(|context| async move {
1323 let cfg = test_cfg(&context);
1324 let journal = Journal::<_, Digest>::init(context.child("storage"), cfg.clone())
1325 .await
1326 .expect("failed to init");
1327
1328 assert!(matches!(
1329 journal.last(0).await,
1330 Err(Error::SectionOutOfRange(0))
1331 ));
1332 assert!(matches!(
1333 journal.last(99).await,
1334 Err(Error::SectionOutOfRange(99))
1335 ));
1336
1337 journal.destroy().await.unwrap();
1338 });
1339 }
1340
1341 #[test_traced]
1342 fn test_last_after_rewind_to_zero() {
1343 let executor = deterministic::Runner::default();
1344 executor.start(|context| async move {
1345 let cfg = test_cfg(&context);
1346 let mut journal = Journal::init(context.child("storage"), cfg.clone())
1347 .await
1348 .expect("failed to init");
1349
1350 journal.append(0, &test_digest(0)).await.unwrap();
1351 journal.append(0, &test_digest(1)).await.unwrap();
1352 journal.sync(0).await.unwrap();
1353
1354 assert!(journal.last(0).await.unwrap().is_some());
1355
1356 journal.rewind(0, 0).await.unwrap();
1357 assert_eq!(journal.last(0).await.unwrap(), None);
1358
1359 journal.destroy().await.unwrap();
1360 });
1361 }
1362
1363 #[test_traced]
1364 fn test_last_pruned_section_returns_error() {
1365 let executor = deterministic::Runner::default();
1366 executor.start(|context| async move {
1367 let cfg = test_cfg(&context);
1368 let mut journal = Journal::<_, Digest>::init(context.child("storage"), cfg.clone())
1369 .await
1370 .expect("failed to init");
1371
1372 journal.append(0, &test_digest(0)).await.unwrap();
1373 journal.append(1, &test_digest(1)).await.unwrap();
1374 journal.sync_all().await.unwrap();
1375
1376 journal.prune(1).await.unwrap();
1377
1378 assert!(matches!(
1379 journal.last(0).await,
1380 Err(Error::AlreadyPrunedToSection(1))
1381 ));
1382 assert!(journal.last(1).await.unwrap().is_some());
1383
1384 journal.destroy().await.unwrap();
1385 });
1386 }
1387
1388 #[test_traced]
1389 fn test_get_many_empty() {
1390 let executor = deterministic::Runner::default();
1391 executor.start(|context| async move {
1392 let cfg = test_cfg(&context);
1393 let mut journal = Journal::init(context.child("storage"), cfg).await.unwrap();
1394 journal.append(0, &test_digest(0)).await.unwrap();
1395 assert_eq!(journal.section_len(0).await.unwrap(), 1);
1396
1397 let mut buf = [];
1398 let items = journal.get_many(0, &[], &mut buf).await.unwrap();
1399 assert!(items.is_empty());
1400
1401 journal.destroy().await.unwrap();
1402 });
1403 }
1404
1405 #[test_traced]
1406 fn test_get_many_single_section() {
1407 let executor = deterministic::Runner::default();
1408 executor.start(|context| async move {
1409 let cfg = test_cfg(&context);
1410 let mut journal = Journal::init(context.child("storage"), cfg).await.unwrap();
1411
1412 for i in 0..5 {
1413 journal.append(0, &test_digest(i)).await.unwrap();
1414 }
1415 assert_eq!(journal.section_len(0).await.unwrap(), 5);
1416
1417 let chunk = Journal::<deterministic::Context, Digest>::CHUNK_SIZE;
1419 let mut buf = vec![0u8; 5 * chunk];
1420 let items = journal
1421 .get_many(0, &[0, 1, 2, 3, 4], &mut buf)
1422 .await
1423 .unwrap();
1424
1425 for (i, item) in items.iter().enumerate() {
1426 assert_eq!(*item, test_digest(i as u64));
1427 }
1428
1429 journal.destroy().await.unwrap();
1430 });
1431 }
1432
1433 #[test_traced]
1434 fn test_get_many_subset() {
1435 let executor = deterministic::Runner::default();
1437 executor.start(|context| async move {
1438 let cfg = test_cfg(&context);
1439 let mut journal = Journal::init(context.child("storage"), cfg).await.unwrap();
1440
1441 for i in 0..10 {
1442 journal.append(0, &test_digest(i)).await.unwrap();
1443 }
1444 assert_eq!(journal.section_len(0).await.unwrap(), 10);
1445
1446 let chunk = Journal::<deterministic::Context, Digest>::CHUNK_SIZE;
1447 let positions = [1, 4, 7, 9];
1448 let mut buf = vec![0u8; positions.len() * chunk];
1449 let items = journal.get_many(0, &positions, &mut buf).await.unwrap();
1450
1451 for (i, &pos) in positions.iter().enumerate() {
1452 assert_eq!(items[i], test_digest(pos));
1453 }
1454
1455 journal.destroy().await.unwrap();
1456 });
1457 }
1458
1459 #[test_traced]
1460 fn test_get_many_bad_section() {
1461 let executor = deterministic::Runner::default();
1462 executor.start(|context| async move {
1463 let cfg = test_cfg(&context);
1464 let journal = Journal::<_, Digest>::init(context.child("storage"), cfg)
1465 .await
1466 .unwrap();
1467
1468 let mut buf = vec![0u8; 64];
1469 let err = journal.get_many(99, &[0], &mut buf).await.unwrap_err();
1470 assert!(matches!(err, Error::SectionOutOfRange(99)));
1471
1472 journal.destroy().await.unwrap();
1473 });
1474 }
1475
1476 #[test_traced]
1477 fn test_get_many_matches_get() {
1478 let executor = deterministic::Runner::default();
1480 executor.start(|context| async move {
1481 let cfg = test_cfg(&context);
1482 let mut journal = Journal::init(context.child("storage"), cfg).await.unwrap();
1483
1484 for i in 0..8 {
1485 journal.append(0, &test_digest(i)).await.unwrap();
1486 }
1487 assert_eq!(journal.section_len(0).await.unwrap(), 8);
1488 journal.sync_all().await.unwrap();
1489
1490 let chunk = Journal::<deterministic::Context, Digest>::CHUNK_SIZE;
1491 let positions: Vec<u64> = (0..8).collect();
1492 let mut buf = vec![0u8; positions.len() * chunk];
1493 let batch = journal.get_many(0, &positions, &mut buf).await.unwrap();
1494
1495 for pos in &positions {
1496 let single = journal.get(0, *pos).await.unwrap();
1497 assert_eq!(batch[*pos as usize], single);
1498 }
1499
1500 journal.destroy().await.unwrap();
1501 });
1502 }
1503}