1use crate::journal::{
2 variable::{Config as VConfig, Journal as VJournal},
3 Error,
4};
5use commonware_codec::Codec;
6use commonware_runtime::{Metrics, Storage};
7use std::{num::NonZeroU64, ops::Bound};
8use tracing::debug;
9
10pub(crate) async fn init_journal<E: Storage + Metrics, V: Codec>(
42 context: E,
43 cfg: VConfig<V::Cfg>,
44 lower_bound: u64,
45 upper_bound: u64,
46 items_per_section: NonZeroU64,
47) -> Result<VJournal<E, V>, Error> {
48 if lower_bound > upper_bound {
49 return Err(Error::InvalidSyncRange(lower_bound, upper_bound));
50 }
51
52 let items_per_section = items_per_section.get();
54 let lower_section = lower_bound / items_per_section;
55 let upper_section = upper_bound / items_per_section;
56
57 debug!(
58 lower_bound,
59 upper_bound,
60 lower_section,
61 upper_section,
62 items_per_section = items_per_section,
63 "initializing variable journal"
64 );
65
66 let mut journal = VJournal::init(context.clone(), cfg.clone()).await?;
68
69 let last_section = journal.blobs.last_key_value().map(|(&s, _)| s);
70
71 let Some(last_section) = last_section else {
73 debug!("no existing journal data, creating fresh journal");
74 return Ok(journal);
75 };
76
77 if last_section < lower_section {
79 debug!(
80 last_section,
81 lower_section, "existing journal data is stale, re-initializing"
82 );
83 journal.destroy().await?;
84 return VJournal::init(context, cfg).await;
85 }
86
87 if lower_section > 0 {
89 journal.prune(lower_section).await?;
90 }
91
92 if last_section > upper_section {
94 debug!(
95 last_section,
96 lower_section,
97 upper_section,
98 "existing journal data exceeds sync range, removing sections beyond upper bound"
99 );
100
101 let sections_to_remove: Vec<u64> = journal
102 .blobs
103 .range((Bound::Excluded(upper_section), Bound::Unbounded))
104 .map(|(§ion, _)| section)
105 .collect();
106
107 for section in sections_to_remove {
108 debug!(section, "removing section beyond upper bound");
109 if let Some(blob) = journal.blobs.remove(§ion) {
110 drop(blob);
111 let name = section.to_be_bytes();
112 journal
113 .context
114 .remove(&journal.cfg.partition, Some(&name))
115 .await?;
116 journal.tracked.dec();
117 }
118 }
119 }
120
121 truncate_upper_section(&mut journal, upper_bound, items_per_section).await?;
123
124 Ok(journal)
125}
126
127async fn truncate_upper_section<E: Storage + Metrics, V: Codec>(
130 journal: &mut VJournal<E, V>,
131 upper_bound: u64,
132 items_per_section: u64,
133) -> Result<(), Error> {
134 let upper_section = upper_bound / items_per_section;
136 let Some(blob) = journal.blobs.get(&upper_section) else {
137 return Ok(()); };
139
140 let section_start = upper_section * items_per_section;
142 let section_end = section_start + items_per_section - 1;
143
144 if upper_bound >= section_end {
146 return Ok(());
147 }
148
149 let items_to_keep = (upper_bound - section_start + 1) as u32;
151 debug!(
152 upper_section,
153 upper_bound,
154 section_start,
155 section_end,
156 items_to_keep,
157 "truncating section to remove items beyond upper_bound"
158 );
159
160 let target_byte_size = compute_offset::<E, V>(
162 blob,
163 &journal.cfg.codec_config,
164 journal.cfg.compression.is_some(),
165 items_to_keep,
166 )
167 .await?;
168
169 journal
171 .rewind_section(upper_section, target_byte_size)
172 .await?;
173
174 debug!(
175 upper_section,
176 items_to_keep, target_byte_size, "section truncated"
177 );
178
179 Ok(())
180}
181
182async fn compute_offset<E: Storage + Metrics, V: Codec>(
184 blob: &commonware_runtime::buffer::Append<E::Blob>,
185 codec_config: &V::Cfg,
186 compressed: bool,
187 items_count: u32,
188) -> Result<u64, Error> {
189 use crate::journal::variable::{Journal, ITEM_ALIGNMENT};
190
191 if items_count == 0 {
192 return Ok(0);
193 }
194
195 let mut current_offset = 0u32;
196
197 for _ in 0..items_count {
199 match Journal::<E, V>::read(compressed, codec_config, blob, current_offset).await {
200 Ok((next_slot, _item_len, _item)) => {
201 current_offset = next_slot;
202 }
203 Err(Error::Runtime(commonware_runtime::Error::BlobInsufficientLength)) => {
204 break;
206 }
207 Err(e) => return Err(e),
208 }
209 }
210
211 Ok((current_offset as u64) * ITEM_ALIGNMENT)
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use crate::journal::variable::ITEM_ALIGNMENT;
218 use commonware_macros::test_traced;
219 use commonware_runtime::{buffer::PoolRef, deterministic, Runner as _};
220 use commonware_utils::{NZUsize, NZU64};
221
222 const PAGE_SIZE: usize = 101;
224 const PAGE_CACHE_SIZE: usize = 2;
225
226 #[test_traced]
228 fn test_init_journal_no_existing_data() {
229 let executor = deterministic::Runner::default();
230 executor.start(|context| async move {
231 let cfg = VConfig {
232 partition: "test_fresh_start".into(),
233 compression: None,
234 codec_config: (),
235 write_buffer: NZUsize!(1024),
236 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
237 };
238
239 let lower_bound = 10;
241 let upper_bound = 25;
242 let items_per_section = NZU64!(5);
243 let mut journal = init_journal(
244 context.clone(),
245 cfg.clone(),
246 lower_bound,
247 upper_bound,
248 items_per_section,
249 )
250 .await
251 .expect("Failed to initialize journal with sync boundaries");
252
253 assert!(journal.blobs.is_empty()); assert_eq!(journal.oldest_section(), None); let lower_section = lower_bound / items_per_section; let (offset, _) = journal.append(lower_section, 42u64).await.unwrap();
262 assert_eq!(offset, 0); let retrieved = journal.get(lower_section, offset).await.unwrap();
266 assert_eq!(retrieved, 42u64);
267
268 let (offset2, _) = journal.append(lower_section, 43u64).await.unwrap();
270 assert_eq!(journal.get(lower_section, offset2).await.unwrap(), 43u64);
271
272 journal.destroy().await.unwrap();
273 });
274 }
275
276 #[test_traced]
278 fn test_init_journal_existing_data_overlap() {
279 let executor = deterministic::Runner::default();
280 executor.start(|context| async move {
281 let cfg = VConfig {
282 partition: "test_overlap".into(),
283 compression: None,
284 codec_config: (),
285 write_buffer: NZUsize!(1024),
286 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
287 };
288
289 let mut journal =
291 VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
292 .await
293 .expect("Failed to create initial journal");
294
295 let items_per_section = NZU64!(5);
296
297 for section in 0..4 {
299 for item in 0..items_per_section.get() {
300 journal.append(section, section * 10 + item).await.unwrap();
301 }
302 }
303 journal.close().await.unwrap();
304
305 let lower_bound = 8;
308 let upper_bound = 30;
309 let mut journal = init_journal(
310 context.clone(),
311 cfg.clone(),
312 lower_bound,
313 upper_bound,
314 items_per_section,
315 )
316 .await
317 .expect("Failed to initialize journal with overlap");
318
319 let lower_section = lower_bound / items_per_section; assert_eq!(lower_section, 1);
322 assert_eq!(journal.oldest_section(), Some(lower_section));
323
324 assert!(!journal.blobs.contains_key(&0)); assert!(journal.blobs.contains_key(&1)); assert!(journal.blobs.contains_key(&2)); assert!(journal.blobs.contains_key(&3)); assert!(!journal.blobs.contains_key(&4)); let item = journal.get(1, 0).await.unwrap();
333 assert_eq!(item, 10u64); let item = journal.get(1, 1).await.unwrap();
335 assert_eq!(item, 11); let item = journal.get(2, 0).await.unwrap();
337 assert_eq!(item, 20); let last_element_section = 19 / items_per_section;
339 let last_element_offset = (19 % items_per_section.get()) as u32;
340 let item = journal
341 .get(last_element_section, last_element_offset)
342 .await
343 .unwrap();
344 assert_eq!(item, 34); let next_element_section = 20 / items_per_section;
346 let next_element_offset = (20 % items_per_section.get()) as u32;
347 let result = journal.get(next_element_section, next_element_offset).await;
348 assert!(matches!(result, Err(Error::SectionOutOfRange(4)))); let (offset, _) = journal.append(next_element_section, 999).await.unwrap();
352 assert_eq!(
353 journal.get(next_element_section, offset).await.unwrap(),
354 999
355 );
356
357 journal.destroy().await.unwrap();
358 });
359 }
360
361 #[test_traced]
363 fn test_init_journal_invalid_parameters() {
364 let executor = deterministic::Runner::default();
365 executor.start(|context| async move {
366 let cfg = VConfig {
367 partition: "test_invalid".into(),
368 compression: None,
369 codec_config: (),
370 write_buffer: NZUsize!(1024),
371 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
372 };
373
374 let result = init_journal::<deterministic::Context, u64>(
376 context.clone(),
377 cfg.clone(),
378 10, 5, NZU64!(5), )
382 .await;
383 assert!(matches!(result, Err(Error::InvalidSyncRange(10, 5))));
384 });
385 }
386
387 #[test_traced]
389 fn test_init_journal_existing_data_exact_match() {
390 let executor = deterministic::Runner::default();
391 executor.start(|context| async move {
392 let cfg = VConfig {
393 partition: "test_exact_match".into(),
394 compression: None,
395 codec_config: (),
396 write_buffer: NZUsize!(1024),
397 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
398 };
399
400 let mut journal =
402 VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
403 .await
404 .expect("Failed to create initial journal");
405
406 let items_per_section = NZU64!(5);
408 for section in 1..4 {
409 for item in 0..items_per_section.get() {
410 journal.append(section, section * 100 + item).await.unwrap();
411 }
412 }
413 journal.close().await.unwrap();
414
415 let lower_bound = 5; let upper_bound = 19; let journal = init_journal(
419 context.clone(),
420 cfg.clone(),
421 lower_bound,
422 upper_bound,
423 items_per_section,
424 )
425 .await
426 .expect("Failed to initialize journal with exact match");
427
428 let lower_section = lower_bound / items_per_section; assert_eq!(journal.oldest_section(), Some(lower_section));
431
432 assert!(!journal.blobs.contains_key(&0)); assert!(journal.blobs.contains_key(&1)); assert!(journal.blobs.contains_key(&2)); assert!(journal.blobs.contains_key(&3)); let item = journal.get(1, 0).await.unwrap();
440 assert_eq!(item, 100u64); let item = journal.get(1, 1).await.unwrap();
442 assert_eq!(item, 101); let item = journal.get(2, 0).await.unwrap();
444 assert_eq!(item, 200); let last_element_section = 19 / items_per_section;
446 let last_element_offset = (19 % items_per_section.get()) as u32;
447 let item = journal
448 .get(last_element_section, last_element_offset)
449 .await
450 .unwrap();
451 assert_eq!(item, 304); let next_element_section = 20 / items_per_section;
453 let next_element_offset = (20 % items_per_section.get()) as u32;
454 let result = journal.get(next_element_section, next_element_offset).await;
455 assert!(matches!(result, Err(Error::SectionOutOfRange(4)))); let mut journal = journal;
459 let (offset, _) = journal.append(next_element_section, 999).await.unwrap();
460 assert_eq!(
461 journal.get(next_element_section, offset).await.unwrap(),
462 999
463 );
464
465 journal.destroy().await.unwrap();
466 });
467 }
468
469 #[test_traced]
471 fn test_init_journal_existing_data_with_rewind() {
472 let executor = deterministic::Runner::default();
473 executor.start(|context| async move {
474 let cfg = VConfig {
475 partition: "test_rewind".into(),
476 compression: None,
477 codec_config: (),
478 write_buffer: NZUsize!(1024),
479 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
480 };
481
482 let mut journal =
484 VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
485 .await
486 .expect("Failed to create initial journal");
487
488 let items_per_section = NZU64!(5);
490 for section in 0..6 {
491 for item in 0..items_per_section.get() {
492 journal
493 .append(section, section * 1000 + item)
494 .await
495 .unwrap();
496 }
497 }
498 journal.close().await.unwrap();
499
500 let lower_bound = 8; let upper_bound = 17; let mut journal = init_journal(
504 context.clone(),
505 cfg.clone(),
506 lower_bound,
507 upper_bound,
508 items_per_section,
509 )
510 .await
511 .expect("Failed to initialize journal with rewind");
512
513 let lower_section = lower_bound / items_per_section; assert_eq!(journal.oldest_section(), Some(lower_section));
516
517 assert!(!journal.blobs.contains_key(&0));
519
520 assert!(journal.blobs.contains_key(&1)); assert!(journal.blobs.contains_key(&2)); assert!(journal.blobs.contains_key(&3)); assert!(!journal.blobs.contains_key(&4)); assert!(!journal.blobs.contains_key(&5)); let item = journal.get(1, 0).await.unwrap();
531 assert_eq!(item, 1000u64); let item = journal.get(1, 1).await.unwrap();
533 assert_eq!(item, 1001); let item = journal.get(3, 0).await.unwrap();
535 assert_eq!(item, 3000); let last_element_section = 17 / items_per_section;
537 let last_element_offset = (17 % items_per_section.get()) as u32;
538 let item = journal
539 .get(last_element_section, last_element_offset)
540 .await
541 .unwrap();
542 assert_eq!(item, 3002); let section_3_size = journal.size(3).await.unwrap();
546 assert_eq!(section_3_size, 3 * ITEM_ALIGNMENT);
547
548 let result = journal.get(3, 3).await;
551 assert!(result.is_err()); let (offset, _) = journal.append(3, 999).await.unwrap();
555 assert_eq!(journal.get(3, offset).await.unwrap(), 999);
556
557 journal.destroy().await.unwrap();
558 });
559 }
560
561 #[test_traced]
563 fn test_init_journal_existing_data_stale() {
564 let executor = deterministic::Runner::default();
565 executor.start(|context| async move {
566 let cfg = VConfig {
567 partition: "test_stale".into(),
568 compression: None,
569 codec_config: (),
570 write_buffer: NZUsize!(1024),
571 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
572 };
573
574 let mut journal =
576 VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
577 .await
578 .expect("Failed to create initial journal");
579
580 let items_per_section = NZU64!(5);
582 for section in 0..2 {
583 for item in 0..items_per_section.get() {
584 journal.append(section, section * 100 + item).await.unwrap();
585 }
586 }
587 journal.close().await.unwrap();
588
589 let lower_bound = 15; let upper_bound = 25; let journal = init_journal::<deterministic::Context, u64>(
593 context.clone(),
594 cfg.clone(),
595 lower_bound,
596 upper_bound,
597 items_per_section,
598 )
599 .await
600 .expect("Failed to initialize journal with stale data");
601
602 assert!(journal.blobs.is_empty());
604 assert_eq!(journal.oldest_section(), None);
605
606 assert!(!journal.blobs.contains_key(&0));
608 assert!(!journal.blobs.contains_key(&1));
609
610 journal.destroy().await.unwrap();
611 });
612 }
613
614 #[test_traced]
616 fn test_init_journal_section_boundaries() {
617 let executor = deterministic::Runner::default();
618 executor.start(|context| async move {
619 let cfg = VConfig {
620 partition: "test_boundaries".into(),
621 compression: None,
622 codec_config: (),
623 write_buffer: NZUsize!(1024),
624 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
625 };
626
627 let mut journal =
629 VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
630 .await
631 .expect("Failed to create initial journal");
632
633 let items_per_section = NZU64!(5);
634
635 for section in 0..5 {
637 for item in 0..items_per_section.get() {
638 journal.append(section, section * 100 + item).await.unwrap();
639 }
640 }
641 journal.close().await.unwrap();
642
643 let lower_bound = 10; let upper_bound = 19; let mut journal = init_journal(
647 context.clone(),
648 cfg.clone(),
649 lower_bound,
650 upper_bound,
651 items_per_section,
652 )
653 .await
654 .expect("Failed to initialize journal at boundaries");
655
656 let lower_section = lower_bound / items_per_section; assert_eq!(journal.oldest_section(), Some(lower_section));
659
660 assert!(!journal.blobs.contains_key(&0));
662 assert!(!journal.blobs.contains_key(&1));
663 assert!(journal.blobs.contains_key(&2));
664 assert!(journal.blobs.contains_key(&3));
665 assert!(!journal.blobs.contains_key(&4)); let item = journal.get(2, 0).await.unwrap();
669 assert_eq!(item, 200u64); let item = journal.get(3, 4).await.unwrap();
671 assert_eq!(item, 304); let next_element_section = 4;
673 let result = journal.get(next_element_section, 0).await;
674 assert!(matches!(result, Err(Error::SectionOutOfRange(4))));
675
676 let (offset, _) = journal.append(next_element_section, 999).await.unwrap();
678 assert_eq!(
679 journal.get(next_element_section, offset).await.unwrap(),
680 999
681 );
682
683 journal.destroy().await.unwrap();
684 });
685 }
686
687 #[test_traced]
689 fn test_init_journal_same_section_bounds() {
690 let executor = deterministic::Runner::default();
691 executor.start(|context| async move {
692 let cfg = VConfig {
693 partition: "test_same_section".into(),
694 compression: None,
695 codec_config: (),
696 write_buffer: NZUsize!(1024),
697 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
698 };
699
700 let mut journal =
702 VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
703 .await
704 .expect("Failed to create initial journal");
705
706 let items_per_section = NZU64!(5);
707
708 for section in 0..3 {
710 for item in 0..items_per_section.get() {
711 journal.append(section, section * 100 + item).await.unwrap();
712 }
713 }
714 journal.close().await.unwrap();
715
716 let lower_bound = 6; let upper_bound = 8; let journal = init_journal(
720 context.clone(),
721 cfg.clone(),
722 lower_bound,
723 upper_bound,
724 items_per_section,
725 )
726 .await
727 .expect("Failed to initialize journal with same-section bounds");
728
729 let target_section = lower_bound / items_per_section; assert_eq!(journal.oldest_section(), Some(target_section));
732
733 assert!(!journal.blobs.contains_key(&0)); assert!(journal.blobs.contains_key(&1)); assert!(!journal.blobs.contains_key(&2)); let item = journal.get(1, 0).await.unwrap();
740 assert_eq!(item, 100u64); let item = journal.get(1, 1).await.unwrap();
742 assert_eq!(item, 101); let item = journal.get(1, 3).await.unwrap();
744 assert_eq!(item, 103); let section_1_size = journal.size(1).await.unwrap();
748 assert_eq!(section_1_size, 64); let result = journal.get(1, 4).await;
752 assert!(result.is_err()); let result = journal.get(2, 0).await;
755 assert!(matches!(result, Err(Error::SectionOutOfRange(2)))); let mut journal = journal;
759 let (offset, _) = journal.append(target_section, 999).await.unwrap();
760 assert_eq!(journal.get(target_section, offset).await.unwrap(), 999);
761
762 journal.destroy().await.unwrap();
763 });
764 }
765
766 #[test_traced]
768 fn test_compute_offset() {
769 let executor = deterministic::Runner::default();
770 executor.start(|context| async move {
771 let cfg = VConfig {
772 partition: "test_compute_offset".into(),
773 compression: None,
774 codec_config: (),
775 write_buffer: NZUsize!(1024),
776 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
777 };
778
779 let mut journal =
781 VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
782 .await
783 .expect("Failed to create journal");
784
785 let section = 0;
786 for i in 0..5 {
787 journal.append(section, i as u64).await.unwrap();
788 }
789 journal.sync(section).await.unwrap();
790
791 let blob = journal.blobs.get(§ion).unwrap();
792
793 let compute_offset = |operations_count: u32| async move {
795 compute_offset::<deterministic::Context, u64>(
796 blob,
797 &journal.cfg.codec_config,
798 journal.cfg.compression.is_some(),
799 operations_count,
800 )
801 .await
802 .unwrap()
803 };
804
805 assert_eq!(compute_offset(0).await, 0); assert_eq!(compute_offset(1).await, 16); assert_eq!(compute_offset(3).await, 48); assert_eq!(compute_offset(5).await, 80); assert_eq!(compute_offset(10).await, 80); journal.destroy().await.unwrap();
815 });
816 }
817
818 #[test_traced]
820 fn test_truncate_section_to_upper_bound() {
821 let executor = deterministic::Runner::default();
822 executor.start(|context| async move {
823 let cfg = VConfig {
824 partition: "test_truncate_section".into(),
825 compression: None,
826 codec_config: (),
827 write_buffer: NZUsize!(1024),
828 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
829 };
830 let items_per_section = 5;
831
832 let create_journal = || async {
834 let mut journal =
835 VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
836 .await
837 .expect("Failed to create journal");
838
839 for section in 0..3 {
841 for i in 0..items_per_section {
842 journal.append(section, section * 100 + i).await.unwrap();
843 }
844 journal.sync(section).await.unwrap();
845 }
846 journal
847 };
848
849 {
851 let mut journal = create_journal().await;
852 let upper_bound = 9; truncate_upper_section(&mut journal, upper_bound, items_per_section)
854 .await
855 .unwrap();
856
857 let section_1_size = journal.size(1).await.unwrap();
859 assert_eq!(section_1_size, 80);
860 journal.destroy().await.unwrap();
861 }
862
863 {
865 let mut journal = create_journal().await;
866 let upper_bound = 7; truncate_upper_section(&mut journal, upper_bound, items_per_section)
868 .await
869 .unwrap();
870
871 let section_1_size = journal.size(1).await.unwrap();
873 assert_eq!(section_1_size, 48);
874
875 assert_eq!(journal.get(1, 0).await.unwrap(), 100); assert_eq!(journal.get(1, 1).await.unwrap(), 101); assert_eq!(journal.get(1, 2).await.unwrap(), 102); let result = journal.get(1, 3).await;
882 assert!(result.is_err());
883 journal.destroy().await.unwrap();
884 }
885
886 {
888 let mut journal = create_journal().await;
889 truncate_upper_section(
890 &mut journal,
891 99, items_per_section,
893 )
894 .await
895 .unwrap(); journal.destroy().await.unwrap();
897 }
898
899 {
901 let mut journal = create_journal().await;
902 let upper_bound = 15; let original_section_2_size = journal.size(2).await.unwrap();
904 truncate_upper_section(&mut journal, upper_bound, items_per_section)
905 .await
906 .unwrap();
907
908 let section_2_size = journal.size(2).await.unwrap();
910 assert_eq!(section_2_size, original_section_2_size);
911 journal.destroy().await.unwrap();
912 }
913 });
914 }
915
916 #[test_traced]
918 fn test_truncate_section_mid_section() {
919 let executor = deterministic::Runner::default();
920 executor.start(|context| async move {
921 let cfg = VConfig {
922 partition: "test_truncation_integration".into(),
923 compression: None,
924 codec_config: (),
925 write_buffer: NZUsize!(1024),
926 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
927 };
928 let items_per_section = 3;
929
930 let mut journal =
932 VJournal::<deterministic::Context, u64>::init(context.clone(), cfg.clone())
933 .await
934 .expect("Failed to create journal");
935
936 for section in 0..3 {
940 for i in 0..items_per_section {
941 let op_value = section * items_per_section + i;
942 journal.append(section, op_value).await.unwrap();
943 }
944 }
945 journal.close().await.unwrap();
946
947 let lower_bound = 2;
950 let upper_bound = 4;
951 let mut journal = init_journal(
952 context.clone(),
953 cfg.clone(),
954 lower_bound,
955 upper_bound,
956 NZU64!(items_per_section),
957 )
958 .await
959 .expect("Failed to initialize synced journal");
960
961 assert!(journal.blobs.contains_key(&0));
963 assert_eq!(journal.get(0, 2).await.unwrap(), 2u64);
964
965 assert!(journal.blobs.contains_key(&1));
967 assert_eq!(journal.get(1, 0).await.unwrap(), 3);
968 assert_eq!(journal.get(1, 1).await.unwrap(), 4);
969
970 let result = journal.get(1, 2).await;
972 assert!(result.is_err());
973
974 assert!(!journal.blobs.contains_key(&2));
976
977 let (offset, _) = journal.append(1, 999).await.unwrap();
979 assert_eq!(journal.get(1, offset).await.unwrap(), 999);
980
981 journal.destroy().await.unwrap();
982 });
983 }
984}