1use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
81use crate::journal::Error;
82use commonware_codec::{
83 varint::{UInt, MAX_U32_VARINT_SIZE},
84 Codec, CodecShared, EncodeSize, ReadExt, Write as CodecWrite,
85};
86use commonware_runtime::{
87 buffer::paged::{Append, CacheRef, Replay},
88 Blob, Buf, BufMut, IoBuf, IoBufMut, Metrics, Storage,
89};
90use futures::stream::{self, Stream, StreamExt};
91use std::{io::Cursor, num::NonZeroUsize};
92use tracing::{trace, warn};
93use zstd::{bulk::compress, decode_all};
94
95#[derive(Clone)]
97pub struct Config<C> {
98 pub partition: String,
101
102 pub compression: Option<u8>,
104
105 pub codec_config: C,
107
108 pub page_cache: CacheRef,
110
111 pub write_buffer: NonZeroUsize,
113}
114
115#[inline]
118fn decode_length_prefix(buf: &mut impl Buf) -> Result<(usize, usize), Error> {
119 let initial = buf.remaining();
120 let size = UInt::<u32>::read(buf)?.0 as usize;
121 let varint_len = initial - buf.remaining();
122 Ok((size, varint_len))
123}
124
125enum ItemInfo {
127 Complete {
129 varint_len: usize,
131 data_len: usize,
133 },
134 Incomplete {
136 varint_len: usize,
138 prefix_len: usize,
140 total_len: usize,
142 },
143}
144
145fn find_item(buf: &mut impl Buf, offset: u64) -> Result<(u64, ItemInfo), Error> {
149 let available = buf.remaining();
150 let (size, varint_len) = decode_length_prefix(buf)?;
151 let next_offset = offset
152 .checked_add(varint_len as u64)
153 .ok_or(Error::OffsetOverflow)?
154 .checked_add(size as u64)
155 .ok_or(Error::OffsetOverflow)?;
156 let buffered = available.saturating_sub(varint_len);
157
158 let item = if buffered >= size {
159 ItemInfo::Complete {
160 varint_len,
161 data_len: size,
162 }
163 } else {
164 ItemInfo::Incomplete {
165 varint_len,
166 prefix_len: buffered,
167 total_len: size,
168 }
169 };
170
171 Ok((next_offset, item))
172}
173
174struct ReplayState<B: Blob, C> {
176 section: u64,
177 blob: Append<B>,
178 replay: Replay<B>,
179 skip_bytes: u64,
180 offset: u64,
181 valid_offset: u64,
182 codec_config: C,
183 compressed: bool,
184 done: bool,
185}
186
187fn decode_item<V: Codec>(item_data: impl Buf, cfg: &V::Cfg, compressed: bool) -> Result<V, Error> {
189 if compressed {
190 let decompressed =
191 decode_all(item_data.reader()).map_err(|_| Error::DecompressionFailed)?;
192 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)
193 } else {
194 V::decode_cfg(item_data, cfg).map_err(Error::Codec)
195 }
196}
197
198pub struct Journal<E: Storage + Metrics, V: Codec> {
212 manager: Manager<E, AppendFactory>,
213
214 compression: Option<u8>,
216
217 codec_config: V::Cfg,
219}
220
221impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
222 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
228 let manager_cfg = ManagerConfig {
229 partition: cfg.partition,
230 factory: AppendFactory {
231 write_buffer: cfg.write_buffer,
232 page_cache_ref: cfg.page_cache,
233 },
234 };
235 let manager = Manager::init(context, manager_cfg).await?;
236
237 Ok(Self {
238 manager,
239 compression: cfg.compression,
240 codec_config: cfg.codec_config,
241 })
242 }
243
244 async fn read(
246 compressed: bool,
247 cfg: &V::Cfg,
248 blob: &Append<E::Blob>,
249 offset: u64,
250 ) -> Result<(u64, u32, V), Error> {
251 let (buf, available) = blob
253 .read_up_to(IoBufMut::zeroed(MAX_U32_VARINT_SIZE), offset)
254 .await?;
255 let buf = buf.freeze();
256 let mut cursor = Cursor::new(buf.slice(..available));
257 let (next_offset, item_info) = find_item(&mut cursor, offset)?;
258
259 let (item_size, decoded) = match item_info {
261 ItemInfo::Complete {
262 varint_len,
263 data_len,
264 } => {
265 let data = buf.slice(varint_len..varint_len + data_len);
267 let decoded = decode_item::<V>(data, cfg, compressed)?;
268 (data_len as u32, decoded)
269 }
270 ItemInfo::Incomplete {
271 varint_len,
272 prefix_len,
273 total_len,
274 } => {
275 let prefix = buf.slice(varint_len..varint_len + prefix_len);
277 let read_offset = offset + varint_len as u64 + prefix_len as u64;
278 let remainder_len = total_len - prefix_len;
279 let mut remainder = vec![0u8; remainder_len];
280 blob.read_into(&mut remainder, read_offset).await?;
281 let chained = prefix.chain(IoBuf::from(remainder));
282 let decoded = decode_item::<V>(chained, cfg, compressed)?;
283 (total_len as u32, decoded)
284 }
285 };
286
287 Ok((next_offset, item_size, decoded))
288 }
289
290 pub async fn replay(
294 &self,
295 start_section: u64,
296 mut start_offset: u64,
297 buffer: NonZeroUsize,
298 ) -> Result<impl Stream<Item = Result<(u64, u64, u32, V), Error>> + Send + '_, Error> {
299 let codec_config = self.codec_config.clone();
301 let compressed = self.compression.is_some();
302 let mut blobs = Vec::new();
303 for (§ion, blob) in self.manager.sections_from(start_section) {
304 blobs.push((
305 section,
306 blob.clone(),
307 blob.replay(buffer).await?,
308 codec_config.clone(),
309 compressed,
310 ));
311 }
312
313 Ok(stream::iter(blobs).flat_map(
315 move |(section, blob, replay, codec_config, compressed)| {
316 let skip_bytes = if section == start_section {
318 start_offset
319 } else {
320 start_offset = 0;
321 0
322 };
323
324 stream::unfold(
325 ReplayState {
326 section,
327 blob,
328 replay,
329 skip_bytes,
330 offset: 0,
331 valid_offset: skip_bytes,
332 codec_config,
333 compressed,
334 done: false,
335 },
336 move |mut state| async move {
337 if state.done {
338 return None;
339 }
340
341 let blob_size = state.replay.blob_size();
342 let mut batch: Vec<Result<(u64, u64, u32, V), Error>> = Vec::new();
343 loop {
344 match state.replay.ensure(MAX_U32_VARINT_SIZE).await {
348 Ok(true) => {}
349 Ok(false) => {
350 if state.replay.remaining() == 0 {
352 state.done = true;
353 return if batch.is_empty() {
354 None
355 } else {
356 Some((batch, state))
357 };
358 }
359 }
361 Err(err) => {
362 batch.push(Err(err.into()));
363 state.done = true;
364 return Some((batch, state));
365 }
366 }
367
368 if state.skip_bytes > 0 {
370 let to_skip =
371 state.skip_bytes.min(state.replay.remaining() as u64) as usize;
372 state.replay.advance(to_skip);
373 state.skip_bytes -= to_skip as u64;
374 state.offset += to_skip as u64;
375 continue;
376 }
377
378 let before_remaining = state.replay.remaining();
380 let (item_size, varint_len) =
381 match decode_length_prefix(&mut state.replay) {
382 Ok(result) => result,
383 Err(err) => {
384 if state.replay.is_exhausted()
386 || before_remaining < MAX_U32_VARINT_SIZE
387 {
388 if state.valid_offset < blob_size
390 && state.offset < blob_size
391 {
392 warn!(
393 blob = state.section,
394 bad_offset = state.offset,
395 new_size = state.valid_offset,
396 "trailing bytes detected: truncating"
397 );
398 state.blob.resize(state.valid_offset).await.ok()?;
399 }
400 state.done = true;
401 return if batch.is_empty() {
402 None
403 } else {
404 Some((batch, state))
405 };
406 }
407 batch.push(Err(err));
408 state.done = true;
409 return Some((batch, state));
410 }
411 };
412
413 match state.replay.ensure(item_size).await {
415 Ok(true) => {}
416 Ok(false) => {
417 warn!(
419 blob = state.section,
420 bad_offset = state.offset,
421 new_size = state.valid_offset,
422 "incomplete item at end: truncating"
423 );
424 state.blob.resize(state.valid_offset).await.ok()?;
425 state.done = true;
426 return if batch.is_empty() {
427 None
428 } else {
429 Some((batch, state))
430 };
431 }
432 Err(err) => {
433 batch.push(Err(err.into()));
434 state.done = true;
435 return Some((batch, state));
436 }
437 }
438
439 let item_offset = state.offset;
441 let next_offset = match state
442 .offset
443 .checked_add(varint_len as u64)
444 .and_then(|o| o.checked_add(item_size as u64))
445 {
446 Some(o) => o,
447 None => {
448 batch.push(Err(Error::OffsetOverflow));
449 state.done = true;
450 return Some((batch, state));
451 }
452 };
453 match decode_item::<V>(
454 (&mut state.replay).take(item_size),
455 &state.codec_config,
456 state.compressed,
457 ) {
458 Ok(decoded) => {
459 batch.push(Ok((
460 state.section,
461 item_offset,
462 item_size as u32,
463 decoded,
464 )));
465 state.valid_offset = next_offset;
466 state.offset = next_offset;
467 }
468 Err(err) => {
469 batch.push(Err(err));
470 state.done = true;
471 return Some((batch, state));
472 }
473 }
474
475 if !batch.is_empty() && state.replay.remaining() < MAX_U32_VARINT_SIZE {
477 return Some((batch, state));
478 }
479 }
480 },
481 )
482 .flat_map(stream::iter)
483 },
484 ))
485 }
486
487 pub async fn append(&mut self, section: u64, item: V) -> Result<(u64, u32), Error> {
491 let (buf, item_len) = if let Some(compression) = self.compression {
493 let encoded = item.encode();
495 let compressed =
496 compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?;
497 let item_len = compressed.len();
498 let item_len_u32: u32 = match item_len.try_into() {
499 Ok(len) => len,
500 Err(_) => return Err(Error::ItemTooLarge(item_len)),
501 };
502 let size_len = UInt(item_len_u32).encode_size();
503 let entry_len = size_len
504 .checked_add(item_len)
505 .ok_or(Error::OffsetOverflow)?;
506
507 let mut buf = Vec::with_capacity(entry_len);
508 UInt(item_len_u32).write(&mut buf);
509 buf.put_slice(&compressed);
510
511 (buf, item_len)
512 } else {
513 let item_len = item.encode_size();
515 let item_len_u32: u32 = match item_len.try_into() {
516 Ok(len) => len,
517 Err(_) => return Err(Error::ItemTooLarge(item_len)),
518 };
519 let size_len = UInt(item_len_u32).encode_size();
520 let entry_len = size_len
521 .checked_add(item_len)
522 .ok_or(Error::OffsetOverflow)?;
523
524 let mut buf = Vec::with_capacity(entry_len);
525 UInt(item_len_u32).write(&mut buf);
526 item.write(&mut buf);
527
528 (buf, item_len)
529 };
530
531 let blob = self.manager.get_or_create(section).await?;
533
534 let offset = blob.size().await;
536
537 blob.append(&buf).await?;
539 trace!(blob = section, offset, "appended item");
540 Ok((offset, item_len as u32))
541 }
542
543 pub async fn get(&self, section: u64, offset: u64) -> Result<V, Error> {
554 let blob = self
555 .manager
556 .get(section)?
557 .ok_or(Error::SectionOutOfRange(section))?;
558
559 let (_, _, item) =
561 Self::read(self.compression.is_some(), &self.codec_config, blob, offset).await?;
562 Ok(item)
563 }
564
565 pub async fn size(&self, section: u64) -> Result<u64, Error> {
569 self.manager.size(section).await
570 }
571
572 pub async fn rewind_to_offset(&mut self, section: u64, offset: u64) -> Result<(), Error> {
580 self.manager.rewind(section, offset).await
581 }
582
583 pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
593 self.manager.rewind(section, size).await
594 }
595
596 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
604 self.manager.rewind_section(section, size).await
605 }
606
607 pub async fn sync(&self, section: u64) -> Result<(), Error> {
611 self.manager.sync(section).await
612 }
613
614 pub async fn sync_all(&self) -> Result<(), Error> {
616 self.manager.sync_all().await
617 }
618
619 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
621 self.manager.prune(min).await
622 }
623
624 pub fn oldest_section(&self) -> Option<u64> {
626 self.manager.oldest_section()
627 }
628
629 pub fn newest_section(&self) -> Option<u64> {
631 self.manager.newest_section()
632 }
633
634 pub fn is_empty(&self) -> bool {
636 self.manager.is_empty()
637 }
638
639 pub fn num_sections(&self) -> usize {
641 self.manager.num_sections()
642 }
643
644 pub async fn destroy(self) -> Result<(), Error> {
646 self.manager.destroy().await
647 }
648
649 pub async fn clear(&mut self) -> Result<(), Error> {
653 self.manager.clear().await
654 }
655}
656
657#[cfg(test)]
658mod tests {
659 use super::*;
660 use commonware_macros::test_traced;
661 use commonware_runtime::{deterministic, Blob, BufMut, Metrics, Runner, Storage};
662 use commonware_utils::{NZUsize, NZU16};
663 use futures::{pin_mut, StreamExt};
664 use std::num::NonZeroU16;
665
666 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
667 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
668
669 #[test_traced]
670 fn test_journal_append_and_read() {
671 let executor = deterministic::Runner::default();
673
674 executor.start(|context| async move {
676 let cfg = Config {
678 partition: "test_partition".into(),
679 compression: None,
680 codec_config: (),
681 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
682 write_buffer: NZUsize!(1024),
683 };
684 let index = 1u64;
685 let data = 10;
686 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
687 .await
688 .expect("Failed to initialize journal");
689
690 journal
692 .append(index, data)
693 .await
694 .expect("Failed to append data");
695
696 let buffer = context.encode();
698 assert!(buffer.contains("first_tracked 1"));
699
700 journal.sync(index).await.expect("Failed to sync journal");
702 drop(journal);
703 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg)
704 .await
705 .expect("Failed to re-initialize journal");
706
707 let mut items = Vec::new();
709 let stream = journal
710 .replay(0, 0, NZUsize!(1024))
711 .await
712 .expect("unable to setup replay");
713 pin_mut!(stream);
714 while let Some(result) = stream.next().await {
715 match result {
716 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
717 Err(err) => panic!("Failed to read item: {err}"),
718 }
719 }
720
721 assert_eq!(items.len(), 1);
723 assert_eq!(items[0].0, index);
724 assert_eq!(items[0].1, data);
725
726 let buffer = context.encode();
728 assert!(buffer.contains("second_tracked 1"));
729 });
730 }
731
732 #[test_traced]
733 fn test_journal_multiple_appends_and_reads() {
734 let executor = deterministic::Runner::default();
736
737 executor.start(|context| async move {
739 let cfg = Config {
741 partition: "test_partition".into(),
742 compression: None,
743 codec_config: (),
744 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
745 write_buffer: NZUsize!(1024),
746 };
747
748 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
750 .await
751 .expect("Failed to initialize journal");
752
753 let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
755 for (index, data) in &data_items {
756 journal
757 .append(*index, *data)
758 .await
759 .expect("Failed to append data");
760 journal.sync(*index).await.expect("Failed to sync blob");
761 }
762
763 let buffer = context.encode();
765 assert!(buffer.contains("first_tracked 3"));
766 assert!(buffer.contains("first_synced_total 4"));
767
768 drop(journal);
770 let journal = Journal::init(context.with_label("second"), cfg)
771 .await
772 .expect("Failed to re-initialize journal");
773
774 let mut items = Vec::<(u64, u32)>::new();
776 {
777 let stream = journal
778 .replay(0, 0, NZUsize!(1024))
779 .await
780 .expect("unable to setup replay");
781 pin_mut!(stream);
782 while let Some(result) = stream.next().await {
783 match result {
784 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
785 Err(err) => panic!("Failed to read item: {err}"),
786 }
787 }
788 }
789
790 assert_eq!(items.len(), data_items.len());
792 for ((expected_index, expected_data), (actual_index, actual_data)) in
793 data_items.iter().zip(items.iter())
794 {
795 assert_eq!(actual_index, expected_index);
796 assert_eq!(actual_data, expected_data);
797 }
798
799 journal.destroy().await.expect("Failed to destroy journal");
801 });
802 }
803
804 #[test_traced]
805 fn test_journal_prune_blobs() {
806 let executor = deterministic::Runner::default();
808
809 executor.start(|context| async move {
811 let cfg = Config {
813 partition: "test_partition".into(),
814 compression: None,
815 codec_config: (),
816 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
817 write_buffer: NZUsize!(1024),
818 };
819
820 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
822 .await
823 .expect("Failed to initialize journal");
824
825 for index in 1u64..=5u64 {
827 journal
828 .append(index, index)
829 .await
830 .expect("Failed to append data");
831 journal.sync(index).await.expect("Failed to sync blob");
832 }
833
834 let data = 99;
836 journal
837 .append(2u64, data)
838 .await
839 .expect("Failed to append data");
840 journal.sync(2u64).await.expect("Failed to sync blob");
841
842 journal.prune(3).await.expect("Failed to prune blobs");
844
845 let buffer = context.encode();
847 assert!(buffer.contains("first_pruned_total 2"));
848
849 journal.prune(2).await.expect("Failed to no-op prune");
851 let buffer = context.encode();
852 assert!(buffer.contains("first_pruned_total 2"));
853
854 drop(journal);
856 let mut journal = Journal::init(context.with_label("second"), cfg.clone())
857 .await
858 .expect("Failed to re-initialize journal");
859
860 let mut items = Vec::<(u64, u64)>::new();
862 {
863 let stream = journal
864 .replay(0, 0, NZUsize!(1024))
865 .await
866 .expect("unable to setup replay");
867 pin_mut!(stream);
868 while let Some(result) = stream.next().await {
869 match result {
870 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
871 Err(err) => panic!("Failed to read item: {err}"),
872 }
873 }
874 }
875
876 assert_eq!(items.len(), 3);
878 let expected_indices = [3u64, 4u64, 5u64];
879 for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
880 assert_eq!(item.0, *expected_index);
881 }
882
883 journal.prune(6).await.expect("Failed to prune blobs");
885
886 drop(journal);
888
889 assert!(context
894 .scan(&cfg.partition)
895 .await
896 .expect("Failed to list blobs")
897 .is_empty());
898 });
899 }
900
901 #[test_traced]
902 fn test_journal_prune_guard() {
903 let executor = deterministic::Runner::default();
904
905 executor.start(|context| async move {
906 let cfg = Config {
907 partition: "test_partition".into(),
908 compression: None,
909 codec_config: (),
910 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
911 write_buffer: NZUsize!(1024),
912 };
913
914 let mut journal = Journal::init(context.clone(), cfg.clone())
915 .await
916 .expect("Failed to initialize journal");
917
918 for section in 1u64..=5u64 {
920 journal
921 .append(section, section as i32)
922 .await
923 .expect("Failed to append data");
924 journal.sync(section).await.expect("Failed to sync");
925 }
926
927 journal.prune(3).await.expect("Failed to prune");
929
930 match journal.append(1, 100).await {
934 Err(Error::AlreadyPrunedToSection(3)) => {}
935 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
936 }
937
938 match journal.append(2, 100).await {
939 Err(Error::AlreadyPrunedToSection(3)) => {}
940 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
941 }
942
943 match journal.get(1, 0).await {
945 Err(Error::AlreadyPrunedToSection(3)) => {}
946 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
947 }
948
949 match journal.size(1).await {
951 Err(Error::AlreadyPrunedToSection(3)) => {}
952 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
953 }
954
955 match journal.rewind(2, 0).await {
957 Err(Error::AlreadyPrunedToSection(3)) => {}
958 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
959 }
960
961 match journal.rewind_section(1, 0).await {
963 Err(Error::AlreadyPrunedToSection(3)) => {}
964 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
965 }
966
967 match journal.sync(2).await {
969 Err(Error::AlreadyPrunedToSection(3)) => {}
970 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
971 }
972
973 assert!(journal.get(3, 0).await.is_ok());
975 assert!(journal.get(4, 0).await.is_ok());
976 assert!(journal.get(5, 0).await.is_ok());
977 assert!(journal.size(3).await.is_ok());
978 assert!(journal.sync(4).await.is_ok());
979
980 journal
982 .append(3, 999)
983 .await
984 .expect("Should be able to append to section 3");
985
986 journal.prune(5).await.expect("Failed to prune");
988
989 match journal.get(3, 0).await {
991 Err(Error::AlreadyPrunedToSection(5)) => {}
992 other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
993 }
994
995 match journal.get(4, 0).await {
996 Err(Error::AlreadyPrunedToSection(5)) => {}
997 other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
998 }
999
1000 assert!(journal.get(5, 0).await.is_ok());
1002 });
1003 }
1004
1005 #[test_traced]
1006 fn test_journal_prune_guard_across_restart() {
1007 let executor = deterministic::Runner::default();
1008
1009 executor.start(|context| async move {
1010 let cfg = Config {
1011 partition: "test_partition".into(),
1012 compression: None,
1013 codec_config: (),
1014 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1015 write_buffer: NZUsize!(1024),
1016 };
1017
1018 {
1020 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1021 .await
1022 .expect("Failed to initialize journal");
1023
1024 for section in 1u64..=5u64 {
1025 journal
1026 .append(section, section as i32)
1027 .await
1028 .expect("Failed to append data");
1029 journal.sync(section).await.expect("Failed to sync");
1030 }
1031
1032 journal.prune(3).await.expect("Failed to prune");
1033 }
1034
1035 {
1037 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
1038 .await
1039 .expect("Failed to re-initialize journal");
1040
1041 match journal.get(1, 0).await {
1044 Err(Error::SectionOutOfRange(1)) => {}
1045 other => panic!("Expected SectionOutOfRange(1), got {other:?}"),
1046 }
1047
1048 match journal.get(2, 0).await {
1049 Err(Error::SectionOutOfRange(2)) => {}
1050 other => panic!("Expected SectionOutOfRange(2), got {other:?}"),
1051 }
1052
1053 assert!(journal.get(3, 0).await.is_ok());
1055 assert!(journal.get(4, 0).await.is_ok());
1056 assert!(journal.get(5, 0).await.is_ok());
1057 }
1058 });
1059 }
1060
1061 #[test_traced]
1062 fn test_journal_with_invalid_blob_name() {
1063 let executor = deterministic::Runner::default();
1065
1066 executor.start(|context| async move {
1068 let cfg = Config {
1070 partition: "test_partition".into(),
1071 compression: None,
1072 codec_config: (),
1073 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1074 write_buffer: NZUsize!(1024),
1075 };
1076
1077 let invalid_blob_name = b"invalid"; let (blob, _) = context
1080 .open(&cfg.partition, invalid_blob_name)
1081 .await
1082 .expect("Failed to create blob with invalid name");
1083 blob.sync().await.expect("Failed to sync blob");
1084
1085 let result = Journal::<_, u64>::init(context, cfg).await;
1087
1088 assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1090 });
1091 }
1092
1093 #[test_traced]
1094 fn test_journal_read_size_missing() {
1095 let executor = deterministic::Runner::default();
1097
1098 executor.start(|context| async move {
1100 let cfg = Config {
1102 partition: "test_partition".into(),
1103 compression: None,
1104 codec_config: (),
1105 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1106 write_buffer: NZUsize!(1024),
1107 };
1108
1109 let section = 1u64;
1111 let blob_name = section.to_be_bytes();
1112 let (blob, _) = context
1113 .open(&cfg.partition, &blob_name)
1114 .await
1115 .expect("Failed to create blob");
1116
1117 let mut incomplete_data = Vec::new();
1119 UInt(u32::MAX).write(&mut incomplete_data);
1120 incomplete_data.truncate(1);
1121 blob.write_at(0, incomplete_data)
1122 .await
1123 .expect("Failed to write incomplete data");
1124 blob.sync().await.expect("Failed to sync blob");
1125
1126 let journal = Journal::init(context, cfg)
1128 .await
1129 .expect("Failed to initialize journal");
1130
1131 let stream = journal
1133 .replay(0, 0, NZUsize!(1024))
1134 .await
1135 .expect("unable to setup replay");
1136 pin_mut!(stream);
1137 let mut items = Vec::<(u64, u64)>::new();
1138 while let Some(result) = stream.next().await {
1139 match result {
1140 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1141 Err(err) => panic!("Failed to read item: {err}"),
1142 }
1143 }
1144 assert!(items.is_empty());
1145 });
1146 }
1147
1148 #[test_traced]
1149 fn test_journal_read_item_missing() {
1150 let executor = deterministic::Runner::default();
1152
1153 executor.start(|context| async move {
1155 let cfg = Config {
1157 partition: "test_partition".into(),
1158 compression: None,
1159 codec_config: (),
1160 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1161 write_buffer: NZUsize!(1024),
1162 };
1163
1164 let section = 1u64;
1166 let blob_name = section.to_be_bytes();
1167 let (blob, _) = context
1168 .open(&cfg.partition, &blob_name)
1169 .await
1170 .expect("Failed to create blob");
1171
1172 let item_size: u32 = 10; let mut buf = Vec::new();
1175 UInt(item_size).write(&mut buf); let data = [2u8; 5];
1177 BufMut::put_slice(&mut buf, &data);
1178 blob.write_at(0, buf)
1179 .await
1180 .expect("Failed to write incomplete item");
1181 blob.sync().await.expect("Failed to sync blob");
1182
1183 let journal = Journal::init(context, cfg)
1185 .await
1186 .expect("Failed to initialize journal");
1187
1188 let stream = journal
1190 .replay(0, 0, NZUsize!(1024))
1191 .await
1192 .expect("unable to setup replay");
1193 pin_mut!(stream);
1194 let mut items = Vec::<(u64, u64)>::new();
1195 while let Some(result) = stream.next().await {
1196 match result {
1197 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1198 Err(err) => panic!("Failed to read item: {err}"),
1199 }
1200 }
1201 assert!(items.is_empty());
1202 });
1203 }
1204
1205 #[test_traced]
1206 fn test_journal_read_checksum_missing() {
1207 let executor = deterministic::Runner::default();
1209
1210 executor.start(|context| async move {
1212 let cfg = Config {
1214 partition: "test_partition".into(),
1215 compression: None,
1216 codec_config: (),
1217 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1218 write_buffer: NZUsize!(1024),
1219 };
1220
1221 let section = 1u64;
1223 let blob_name = section.to_be_bytes();
1224 let (blob, _) = context
1225 .open(&cfg.partition, &blob_name)
1226 .await
1227 .expect("Failed to create blob");
1228
1229 let item_data = b"Test data";
1231 let item_size = item_data.len() as u32;
1232
1233 let mut buf = Vec::new();
1235 UInt(item_size).write(&mut buf);
1236 BufMut::put_slice(&mut buf, item_data);
1237 blob.write_at(0, buf)
1238 .await
1239 .expect("Failed to write item without checksum");
1240
1241 blob.sync().await.expect("Failed to sync blob");
1242
1243 let journal = Journal::init(context, cfg)
1245 .await
1246 .expect("Failed to initialize journal");
1247
1248 let stream = journal
1252 .replay(0, 0, NZUsize!(1024))
1253 .await
1254 .expect("unable to setup replay");
1255 pin_mut!(stream);
1256 let mut items = Vec::<(u64, u64)>::new();
1257 while let Some(result) = stream.next().await {
1258 match result {
1259 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1260 Err(err) => panic!("Failed to read item: {err}"),
1261 }
1262 }
1263 assert!(items.is_empty());
1264 });
1265 }
1266
1267 #[test_traced]
1268 fn test_journal_read_checksum_mismatch() {
1269 let executor = deterministic::Runner::default();
1271
1272 executor.start(|context| async move {
1274 let cfg = Config {
1276 partition: "test_partition".into(),
1277 compression: None,
1278 codec_config: (),
1279 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1280 write_buffer: NZUsize!(1024),
1281 };
1282
1283 let section = 1u64;
1285 let blob_name = section.to_be_bytes();
1286 let (blob, _) = context
1287 .open(&cfg.partition, &blob_name)
1288 .await
1289 .expect("Failed to create blob");
1290
1291 let item_data = b"Test data";
1293 let item_size = item_data.len() as u32;
1294 let incorrect_checksum: u32 = 0xDEADBEEF;
1295
1296 let mut buf = Vec::new();
1298 UInt(item_size).write(&mut buf);
1299 BufMut::put_slice(&mut buf, item_data);
1300 buf.put_u32(incorrect_checksum);
1301 blob.write_at(0, buf)
1302 .await
1303 .expect("Failed to write item with bad checksum");
1304
1305 blob.sync().await.expect("Failed to sync blob");
1306
1307 let journal = Journal::init(context.clone(), cfg.clone())
1309 .await
1310 .expect("Failed to initialize journal");
1311
1312 {
1314 let stream = journal
1315 .replay(0, 0, NZUsize!(1024))
1316 .await
1317 .expect("unable to setup replay");
1318 pin_mut!(stream);
1319 let mut items = Vec::<(u64, u64)>::new();
1320 while let Some(result) = stream.next().await {
1321 match result {
1322 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1323 Err(err) => panic!("Failed to read item: {err}"),
1324 }
1325 }
1326 assert!(items.is_empty());
1327 }
1328 drop(journal);
1329
1330 let (_, blob_size) = context
1332 .open(&cfg.partition, §ion.to_be_bytes())
1333 .await
1334 .expect("Failed to open blob");
1335 assert_eq!(blob_size, 0);
1336 });
1337 }
1338
1339 #[test_traced]
1340 fn test_journal_truncation_recovery() {
1341 let executor = deterministic::Runner::default();
1343
1344 executor.start(|context| async move {
1346 let cfg = Config {
1348 partition: "test_partition".into(),
1349 compression: None,
1350 codec_config: (),
1351 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1352 write_buffer: NZUsize!(1024),
1353 };
1354
1355 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1357 .await
1358 .expect("Failed to initialize journal");
1359
1360 journal.append(1, 1).await.expect("Failed to append data");
1362
1363 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1365 for (index, data) in &data_items {
1366 journal
1367 .append(*index, *data)
1368 .await
1369 .expect("Failed to append data");
1370 journal.sync(*index).await.expect("Failed to sync blob");
1371 }
1372
1373 journal.sync_all().await.expect("Failed to sync");
1375 drop(journal);
1376
1377 let (blob, blob_size) = context
1379 .open(&cfg.partition, &2u64.to_be_bytes())
1380 .await
1381 .expect("Failed to open blob");
1382 blob.resize(blob_size - 4)
1383 .await
1384 .expect("Failed to corrupt blob");
1385 blob.sync().await.expect("Failed to sync blob");
1386
1387 let journal = Journal::init(context.with_label("second"), cfg.clone())
1389 .await
1390 .expect("Failed to re-initialize journal");
1391
1392 let mut items = Vec::<(u64, u32)>::new();
1394 {
1395 let stream = journal
1396 .replay(0, 0, NZUsize!(1024))
1397 .await
1398 .expect("unable to setup replay");
1399 pin_mut!(stream);
1400 while let Some(result) = stream.next().await {
1401 match result {
1402 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1403 Err(err) => panic!("Failed to read item: {err}"),
1404 }
1405 }
1406 }
1407 drop(journal);
1408
1409 assert_eq!(items.len(), 1);
1411 assert_eq!(items[0].0, 1);
1412 assert_eq!(items[0].1, 1);
1413
1414 let (_, blob_size) = context
1416 .open(&cfg.partition, &2u64.to_be_bytes())
1417 .await
1418 .expect("Failed to open blob");
1419 assert_eq!(blob_size, 0);
1420
1421 let mut journal = Journal::init(context.with_label("third"), cfg.clone())
1423 .await
1424 .expect("Failed to re-initialize journal");
1425
1426 let mut items = Vec::<(u64, u32)>::new();
1428 {
1429 let stream = journal
1430 .replay(0, 0, NZUsize!(1024))
1431 .await
1432 .expect("unable to setup replay");
1433 pin_mut!(stream);
1434 while let Some(result) = stream.next().await {
1435 match result {
1436 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1437 Err(err) => panic!("Failed to read item: {err}"),
1438 }
1439 }
1440 }
1441
1442 assert_eq!(items.len(), 1);
1444 assert_eq!(items[0].0, 1);
1445 assert_eq!(items[0].1, 1);
1446
1447 let (_offset, _) = journal.append(2, 5).await.expect("Failed to append data");
1449 journal.sync(2).await.expect("Failed to sync blob");
1450
1451 let item = journal.get(2, 0).await.expect("Failed to get item");
1453 assert_eq!(item, 5);
1454
1455 drop(journal);
1457
1458 let journal = Journal::init(context.clone(), cfg.clone())
1460 .await
1461 .expect("Failed to re-initialize journal");
1462
1463 let mut items = Vec::<(u64, u32)>::new();
1465 {
1466 let stream = journal
1467 .replay(0, 0, NZUsize!(1024))
1468 .await
1469 .expect("unable to setup replay");
1470 pin_mut!(stream);
1471 while let Some(result) = stream.next().await {
1472 match result {
1473 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1474 Err(err) => panic!("Failed to read item: {err}"),
1475 }
1476 }
1477 }
1478
1479 assert_eq!(items.len(), 2);
1481 assert_eq!(items[0].0, 1);
1482 assert_eq!(items[0].1, 1);
1483 assert_eq!(items[1].0, 2);
1484 assert_eq!(items[1].1, 5);
1485 });
1486 }
1487
1488 #[test_traced]
1489 fn test_journal_handling_extra_data() {
1490 let executor = deterministic::Runner::default();
1492
1493 executor.start(|context| async move {
1495 let cfg = Config {
1497 partition: "test_partition".into(),
1498 compression: None,
1499 codec_config: (),
1500 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1501 write_buffer: NZUsize!(1024),
1502 };
1503
1504 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1506 .await
1507 .expect("Failed to initialize journal");
1508
1509 journal.append(1, 1).await.expect("Failed to append data");
1511
1512 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1514 for (index, data) in &data_items {
1515 journal
1516 .append(*index, *data)
1517 .await
1518 .expect("Failed to append data");
1519 journal.sync(*index).await.expect("Failed to sync blob");
1520 }
1521
1522 journal.sync_all().await.expect("Failed to sync");
1524 drop(journal);
1525
1526 let (blob, blob_size) = context
1528 .open(&cfg.partition, &2u64.to_be_bytes())
1529 .await
1530 .expect("Failed to open blob");
1531 blob.write_at(blob_size, vec![0u8; 16])
1532 .await
1533 .expect("Failed to add extra data");
1534 blob.sync().await.expect("Failed to sync blob");
1535
1536 let journal = Journal::init(context.with_label("second"), cfg)
1538 .await
1539 .expect("Failed to re-initialize journal");
1540
1541 let mut items = Vec::<(u64, i32)>::new();
1543 let stream = journal
1544 .replay(0, 0, NZUsize!(1024))
1545 .await
1546 .expect("unable to setup replay");
1547 pin_mut!(stream);
1548 while let Some(result) = stream.next().await {
1549 match result {
1550 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1551 Err(err) => panic!("Failed to read item: {err}"),
1552 }
1553 }
1554 });
1555 }
1556
1557 #[test_traced]
1558 fn test_journal_rewind() {
1559 let executor = deterministic::Runner::default();
1561 executor.start(|context| async move {
1562 let cfg = Config {
1564 partition: "test_partition".to_string(),
1565 compression: None,
1566 codec_config: (),
1567 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1568 write_buffer: NZUsize!(1024),
1569 };
1570 let mut journal = Journal::init(context, cfg).await.unwrap();
1571
1572 let size = journal.size(1).await.unwrap();
1574 assert_eq!(size, 0);
1575
1576 journal.append(1, 42i32).await.unwrap();
1578
1579 let size = journal.size(1).await.unwrap();
1581 assert!(size > 0);
1582
1583 journal.append(1, 43i32).await.unwrap();
1585 let new_size = journal.size(1).await.unwrap();
1586 assert!(new_size > size);
1587
1588 let size = journal.size(2).await.unwrap();
1590 assert_eq!(size, 0);
1591
1592 journal.append(2, 44i32).await.unwrap();
1594
1595 let size = journal.size(2).await.unwrap();
1597 assert!(size > 0);
1598
1599 journal.rewind(1, 0).await.unwrap();
1601
1602 let size = journal.size(1).await.unwrap();
1604 assert_eq!(size, 0);
1605
1606 let size = journal.size(2).await.unwrap();
1608 assert_eq!(size, 0);
1609 });
1610 }
1611
1612 #[test_traced]
1613 fn test_journal_rewind_section() {
1614 let executor = deterministic::Runner::default();
1616 executor.start(|context| async move {
1617 let cfg = Config {
1619 partition: "test_partition".to_string(),
1620 compression: None,
1621 codec_config: (),
1622 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1623 write_buffer: NZUsize!(1024),
1624 };
1625 let mut journal = Journal::init(context, cfg).await.unwrap();
1626
1627 let size = journal.size(1).await.unwrap();
1629 assert_eq!(size, 0);
1630
1631 journal.append(1, 42i32).await.unwrap();
1633
1634 let size = journal.size(1).await.unwrap();
1636 assert!(size > 0);
1637
1638 journal.append(1, 43i32).await.unwrap();
1640 let new_size = journal.size(1).await.unwrap();
1641 assert!(new_size > size);
1642
1643 let size = journal.size(2).await.unwrap();
1645 assert_eq!(size, 0);
1646
1647 journal.append(2, 44i32).await.unwrap();
1649
1650 let size = journal.size(2).await.unwrap();
1652 assert!(size > 0);
1653
1654 journal.rewind_section(1, 0).await.unwrap();
1656
1657 let size = journal.size(1).await.unwrap();
1659 assert_eq!(size, 0);
1660
1661 let size = journal.size(2).await.unwrap();
1663 assert!(size > 0);
1664 });
1665 }
1666
1667 #[test_traced]
1668 fn test_journal_small_items() {
1669 let executor = deterministic::Runner::default();
1670 executor.start(|context| async move {
1671 let cfg = Config {
1672 partition: "test_partition".into(),
1673 compression: None,
1674 codec_config: (),
1675 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1676 write_buffer: NZUsize!(1024),
1677 };
1678
1679 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1680 .await
1681 .expect("Failed to initialize journal");
1682
1683 let num_items = 100;
1685 let mut offsets = Vec::new();
1686 for i in 0..num_items {
1687 let (offset, size) = journal
1688 .append(1, i as u8)
1689 .await
1690 .expect("Failed to append data");
1691 assert_eq!(size, 1, "u8 should encode to 1 byte");
1692 offsets.push(offset);
1693 }
1694 journal.sync(1).await.expect("Failed to sync");
1695
1696 for (i, &offset) in offsets.iter().enumerate() {
1698 let item: u8 = journal.get(1, offset).await.expect("Failed to get item");
1699 assert_eq!(item, i as u8, "Item mismatch at offset {offset}");
1700 }
1701
1702 drop(journal);
1704 let journal = Journal::<_, u8>::init(context.with_label("second"), cfg)
1705 .await
1706 .expect("Failed to re-initialize journal");
1707
1708 let stream = journal
1710 .replay(0, 0, NZUsize!(1024))
1711 .await
1712 .expect("Failed to setup replay");
1713 pin_mut!(stream);
1714
1715 let mut count = 0;
1716 while let Some(result) = stream.next().await {
1717 let (section, offset, size, item) = result.expect("Failed to replay item");
1718 assert_eq!(section, 1);
1719 assert_eq!(offset, offsets[count]);
1720 assert_eq!(size, 1);
1721 assert_eq!(item, count as u8);
1722 count += 1;
1723 }
1724 assert_eq!(count, num_items, "Should replay all items");
1725 });
1726 }
1727
1728 #[test_traced]
1729 fn test_journal_rewind_many_sections() {
1730 let executor = deterministic::Runner::default();
1731 executor.start(|context| async move {
1732 let cfg = Config {
1733 partition: "test_partition".to_string(),
1734 compression: None,
1735 codec_config: (),
1736 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1737 write_buffer: NZUsize!(1024),
1738 };
1739 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1740
1741 for section in 1u64..=10 {
1743 journal.append(section, section as i32).await.unwrap();
1744 }
1745 journal.sync_all().await.unwrap();
1746
1747 for section in 1u64..=10 {
1749 let size = journal.size(section).await.unwrap();
1750 assert!(size > 0, "section {section} should have data");
1751 }
1752
1753 journal
1755 .rewind(5, journal.size(5).await.unwrap())
1756 .await
1757 .unwrap();
1758
1759 for section in 1u64..=5 {
1761 let size = journal.size(section).await.unwrap();
1762 assert!(size > 0, "section {section} should still have data");
1763 }
1764
1765 for section in 6u64..=10 {
1767 let size = journal.size(section).await.unwrap();
1768 assert_eq!(size, 0, "section {section} should be removed");
1769 }
1770
1771 {
1773 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1774 pin_mut!(stream);
1775 let mut items = Vec::new();
1776 while let Some(result) = stream.next().await {
1777 let (section, _, _, item) = result.unwrap();
1778 items.push((section, item));
1779 }
1780 assert_eq!(items.len(), 5);
1781 for (i, (section, item)) in items.iter().enumerate() {
1782 assert_eq!(*section, (i + 1) as u64);
1783 assert_eq!(*item, (i + 1) as i32);
1784 }
1785 }
1786
1787 journal.destroy().await.unwrap();
1788 });
1789 }
1790
1791 #[test_traced]
1792 fn test_journal_rewind_partial_truncation() {
1793 let executor = deterministic::Runner::default();
1794 executor.start(|context| async move {
1795 let cfg = Config {
1796 partition: "test_partition".to_string(),
1797 compression: None,
1798 codec_config: (),
1799 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1800 write_buffer: NZUsize!(1024),
1801 };
1802 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1803
1804 let mut sizes = Vec::new();
1806 for i in 0..5 {
1807 journal.append(1, i).await.unwrap();
1808 journal.sync(1).await.unwrap();
1809 sizes.push(journal.size(1).await.unwrap());
1810 }
1811
1812 let target_size = sizes[2];
1814 journal.rewind(1, target_size).await.unwrap();
1815
1816 let new_size = journal.size(1).await.unwrap();
1818 assert_eq!(new_size, target_size);
1819
1820 {
1822 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1823 pin_mut!(stream);
1824 let mut items = Vec::new();
1825 while let Some(result) = stream.next().await {
1826 let (_, _, _, item) = result.unwrap();
1827 items.push(item);
1828 }
1829 assert_eq!(items.len(), 3);
1830 for (i, item) in items.iter().enumerate() {
1831 assert_eq!(*item, i as i32);
1832 }
1833 }
1834
1835 journal.destroy().await.unwrap();
1836 });
1837 }
1838
1839 #[test_traced]
1840 fn test_journal_rewind_nonexistent_target() {
1841 let executor = deterministic::Runner::default();
1842 executor.start(|context| async move {
1843 let cfg = Config {
1844 partition: "test_partition".to_string(),
1845 compression: None,
1846 codec_config: (),
1847 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1848 write_buffer: NZUsize!(1024),
1849 };
1850 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1851
1852 for section in 5u64..=7 {
1854 journal.append(section, section as i32).await.unwrap();
1855 }
1856 journal.sync_all().await.unwrap();
1857
1858 journal.rewind(3, 0).await.unwrap();
1860
1861 for section in 5u64..=7 {
1863 let size = journal.size(section).await.unwrap();
1864 assert_eq!(size, 0, "section {section} should be removed");
1865 }
1866
1867 {
1869 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1870 pin_mut!(stream);
1871 let items: Vec<_> = stream.collect().await;
1872 assert!(items.is_empty());
1873 }
1874
1875 journal.destroy().await.unwrap();
1876 });
1877 }
1878
1879 #[test_traced]
1880 fn test_journal_rewind_persistence() {
1881 let executor = deterministic::Runner::default();
1882 executor.start(|context| async move {
1883 let cfg = Config {
1884 partition: "test_partition".to_string(),
1885 compression: None,
1886 codec_config: (),
1887 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1888 write_buffer: NZUsize!(1024),
1889 };
1890
1891 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1893 .await
1894 .unwrap();
1895 for section in 1u64..=5 {
1896 journal.append(section, section as i32).await.unwrap();
1897 }
1898 journal.sync_all().await.unwrap();
1899
1900 let size = journal.size(2).await.unwrap();
1902 journal.rewind(2, size).await.unwrap();
1903 journal.sync_all().await.unwrap();
1904 drop(journal);
1905
1906 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
1908 .await
1909 .unwrap();
1910
1911 for section in 1u64..=2 {
1913 let size = journal.size(section).await.unwrap();
1914 assert!(size > 0, "section {section} should have data after restart");
1915 }
1916
1917 for section in 3u64..=5 {
1919 let size = journal.size(section).await.unwrap();
1920 assert_eq!(size, 0, "section {section} should be gone after restart");
1921 }
1922
1923 {
1925 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1926 pin_mut!(stream);
1927 let mut items = Vec::new();
1928 while let Some(result) = stream.next().await {
1929 let (section, _, _, item) = result.unwrap();
1930 items.push((section, item));
1931 }
1932 assert_eq!(items.len(), 2);
1933 assert_eq!(items[0], (1, 1));
1934 assert_eq!(items[1], (2, 2));
1935 }
1936
1937 journal.destroy().await.unwrap();
1938 });
1939 }
1940
1941 #[test_traced]
1942 fn test_journal_rewind_to_zero_removes_all_newer() {
1943 let executor = deterministic::Runner::default();
1944 executor.start(|context| async move {
1945 let cfg = Config {
1946 partition: "test_partition".to_string(),
1947 compression: None,
1948 codec_config: (),
1949 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1950 write_buffer: NZUsize!(1024),
1951 };
1952 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1953
1954 for section in 1u64..=3 {
1956 journal.append(section, section as i32).await.unwrap();
1957 }
1958 journal.sync_all().await.unwrap();
1959
1960 journal.rewind(1, 0).await.unwrap();
1962
1963 let size = journal.size(1).await.unwrap();
1965 assert_eq!(size, 0, "section 1 should be empty");
1966
1967 for section in 2u64..=3 {
1969 let size = journal.size(section).await.unwrap();
1970 assert_eq!(size, 0, "section {section} should be removed");
1971 }
1972
1973 {
1975 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1976 pin_mut!(stream);
1977 let items: Vec<_> = stream.collect().await;
1978 assert!(items.is_empty());
1979 }
1980
1981 journal.destroy().await.unwrap();
1982 });
1983 }
1984
1985 #[test_traced]
1986 fn test_journal_replay_start_offset_with_trailing_bytes() {
1987 let executor = deterministic::Runner::default();
1989 executor.start(|context| async move {
1990 let cfg = Config {
1991 partition: "test_partition".into(),
1992 compression: None,
1993 codec_config: (),
1994 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1995 write_buffer: NZUsize!(1024),
1996 };
1997 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1998 .await
1999 .expect("Failed to initialize journal");
2000
2001 for i in 0..5i32 {
2003 journal.append(1, i).await.unwrap();
2004 }
2005 journal.sync(1).await.unwrap();
2006 let valid_logical_size = journal.size(1).await.unwrap();
2007 drop(journal);
2008
2009 let (blob, physical_size_before) = context
2011 .open(&cfg.partition, &1u64.to_be_bytes())
2012 .await
2013 .unwrap();
2014
2015 blob.write_at(physical_size_before, vec![0xFF, 0xFF])
2018 .await
2019 .unwrap();
2020 blob.sync().await.unwrap();
2021
2022 let start_offset = valid_logical_size;
2026 {
2027 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2028 .await
2029 .unwrap();
2030
2031 let stream = journal
2032 .replay(1, start_offset, NZUsize!(1024))
2033 .await
2034 .unwrap();
2035 pin_mut!(stream);
2036
2037 while let Some(_result) = stream.next().await {}
2039 }
2040
2041 let (_, physical_size_after) = context
2043 .open(&cfg.partition, &1u64.to_be_bytes())
2044 .await
2045 .unwrap();
2046
2047 assert!(
2050 physical_size_after >= physical_size_before,
2051 "Valid data was lost! Physical blob truncated from {physical_size_before} to \
2052 {physical_size_after}. Logical valid size was {valid_logical_size}. \
2053 This indicates valid_offset was incorrectly initialized to 0 instead of start_offset."
2054 );
2055 });
2056 }
2057
2058 #[test_traced]
2059 fn test_journal_large_item_spanning_pages() {
2060 const LARGE_SIZE: usize = 2048;
2062 type LargeItem = [u8; LARGE_SIZE];
2063
2064 let executor = deterministic::Runner::default();
2065 executor.start(|context| async move {
2066 let cfg = Config {
2067 partition: "test_partition".into(),
2068 compression: None,
2069 codec_config: (),
2070 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2071 write_buffer: NZUsize!(4096),
2072 };
2073 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2074 .await
2075 .expect("Failed to initialize journal");
2076
2077 let mut large_data: LargeItem = [0u8; LARGE_SIZE];
2079 for (i, byte) in large_data.iter_mut().enumerate() {
2080 *byte = (i % 256) as u8;
2081 }
2082 assert!(
2083 LARGE_SIZE > PAGE_SIZE.get() as usize,
2084 "Item must be larger than page size"
2085 );
2086
2087 let (offset, size) = journal
2089 .append(1, large_data)
2090 .await
2091 .expect("Failed to append large item");
2092 assert_eq!(size as usize, LARGE_SIZE);
2093 journal.sync(1).await.expect("Failed to sync");
2094
2095 let retrieved: LargeItem = journal
2097 .get(1, offset)
2098 .await
2099 .expect("Failed to get large item");
2100 assert_eq!(retrieved, large_data, "Random access read mismatch");
2101
2102 drop(journal);
2104 let journal = Journal::<_, LargeItem>::init(context.with_label("second"), cfg.clone())
2105 .await
2106 .expect("Failed to re-initialize journal");
2107
2108 {
2110 let stream = journal
2111 .replay(0, 0, NZUsize!(1024))
2112 .await
2113 .expect("Failed to setup replay");
2114 pin_mut!(stream);
2115
2116 let mut items = Vec::new();
2117 while let Some(result) = stream.next().await {
2118 let (section, off, sz, item) = result.expect("Failed to replay item");
2119 items.push((section, off, sz, item));
2120 }
2121
2122 assert_eq!(items.len(), 1, "Should have exactly one item");
2123 let (section, off, sz, item) = &items[0];
2124 assert_eq!(*section, 1);
2125 assert_eq!(*off, offset);
2126 assert_eq!(*sz as usize, LARGE_SIZE);
2127 assert_eq!(*item, large_data, "Replay read mismatch");
2128 }
2129
2130 journal.destroy().await.unwrap();
2131 });
2132 }
2133
2134 #[test_traced]
2135 fn test_journal_non_contiguous_sections() {
2136 let executor = deterministic::Runner::default();
2139 executor.start(|context| async move {
2140 let cfg = Config {
2141 partition: "test_partition".into(),
2142 compression: None,
2143 codec_config: (),
2144 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2145 write_buffer: NZUsize!(1024),
2146 };
2147 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2148 .await
2149 .expect("Failed to initialize journal");
2150
2151 let sections_and_data = [(1u64, 100i32), (5u64, 500i32), (10u64, 1000i32)];
2153 let mut offsets = Vec::new();
2154
2155 for (section, data) in §ions_and_data {
2156 let (offset, _) = journal
2157 .append(*section, *data)
2158 .await
2159 .expect("Failed to append");
2160 offsets.push(offset);
2161 }
2162 journal.sync_all().await.expect("Failed to sync");
2163
2164 for (i, (section, expected_data)) in sections_and_data.iter().enumerate() {
2166 let retrieved: i32 = journal
2167 .get(*section, offsets[i])
2168 .await
2169 .expect("Failed to get item");
2170 assert_eq!(retrieved, *expected_data);
2171 }
2172
2173 for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
2175 let result = journal.get(missing_section, 0).await;
2176 assert!(
2177 matches!(result, Err(Error::SectionOutOfRange(_))),
2178 "Expected SectionOutOfRange for section {}, got {:?}",
2179 missing_section,
2180 result
2181 );
2182 }
2183
2184 drop(journal);
2186 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2187 .await
2188 .expect("Failed to re-initialize journal");
2189
2190 {
2192 let stream = journal
2193 .replay(0, 0, NZUsize!(1024))
2194 .await
2195 .expect("Failed to setup replay");
2196 pin_mut!(stream);
2197
2198 let mut items = Vec::new();
2199 while let Some(result) = stream.next().await {
2200 let (section, _, _, item) = result.expect("Failed to replay item");
2201 items.push((section, item));
2202 }
2203
2204 assert_eq!(items.len(), 3, "Should have 3 items");
2205 assert_eq!(items[0], (1, 100));
2206 assert_eq!(items[1], (5, 500));
2207 assert_eq!(items[2], (10, 1000));
2208 }
2209
2210 {
2212 let stream = journal
2213 .replay(5, 0, NZUsize!(1024))
2214 .await
2215 .expect("Failed to setup replay from section 5");
2216 pin_mut!(stream);
2217
2218 let mut items = Vec::new();
2219 while let Some(result) = stream.next().await {
2220 let (section, _, _, item) = result.expect("Failed to replay item");
2221 items.push((section, item));
2222 }
2223
2224 assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
2225 assert_eq!(items[0], (5, 500));
2226 assert_eq!(items[1], (10, 1000));
2227 }
2228
2229 {
2231 let stream = journal
2232 .replay(3, 0, NZUsize!(1024))
2233 .await
2234 .expect("Failed to setup replay from section 3");
2235 pin_mut!(stream);
2236
2237 let mut items = Vec::new();
2238 while let Some(result) = stream.next().await {
2239 let (section, _, _, item) = result.expect("Failed to replay item");
2240 items.push((section, item));
2241 }
2242
2243 assert_eq!(items.len(), 2);
2245 assert_eq!(items[0], (5, 500));
2246 assert_eq!(items[1], (10, 1000));
2247 }
2248
2249 journal.destroy().await.unwrap();
2250 });
2251 }
2252
2253 #[test_traced]
2254 fn test_journal_empty_section_in_middle() {
2255 let executor = deterministic::Runner::default();
2258 executor.start(|context| async move {
2259 let cfg = Config {
2260 partition: "test_partition".into(),
2261 compression: None,
2262 codec_config: (),
2263 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2264 write_buffer: NZUsize!(1024),
2265 };
2266 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2267 .await
2268 .expect("Failed to initialize journal");
2269
2270 journal.append(1, 100i32).await.expect("Failed to append");
2272
2273 journal.append(2, 200i32).await.expect("Failed to append");
2276 journal.sync(2).await.expect("Failed to sync");
2277 journal
2278 .rewind_section(2, 0)
2279 .await
2280 .expect("Failed to rewind");
2281
2282 journal.append(3, 300i32).await.expect("Failed to append");
2284
2285 journal.sync_all().await.expect("Failed to sync");
2286
2287 assert!(journal.size(1).await.unwrap() > 0);
2289 assert_eq!(journal.size(2).await.unwrap(), 0);
2290 assert!(journal.size(3).await.unwrap() > 0);
2291
2292 drop(journal);
2294 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2295 .await
2296 .expect("Failed to re-initialize journal");
2297
2298 {
2300 let stream = journal
2301 .replay(0, 0, NZUsize!(1024))
2302 .await
2303 .expect("Failed to setup replay");
2304 pin_mut!(stream);
2305
2306 let mut items = Vec::new();
2307 while let Some(result) = stream.next().await {
2308 let (section, _, _, item) = result.expect("Failed to replay item");
2309 items.push((section, item));
2310 }
2311
2312 assert_eq!(
2313 items.len(),
2314 2,
2315 "Should have 2 items (skipping empty section)"
2316 );
2317 assert_eq!(items[0], (1, 100));
2318 assert_eq!(items[1], (3, 300));
2319 }
2320
2321 {
2323 let stream = journal
2324 .replay(2, 0, NZUsize!(1024))
2325 .await
2326 .expect("Failed to setup replay from section 2");
2327 pin_mut!(stream);
2328
2329 let mut items = Vec::new();
2330 while let Some(result) = stream.next().await {
2331 let (section, _, _, item) = result.expect("Failed to replay item");
2332 items.push((section, item));
2333 }
2334
2335 assert_eq!(items.len(), 1, "Should have 1 item from section 3");
2336 assert_eq!(items[0], (3, 300));
2337 }
2338
2339 journal.destroy().await.unwrap();
2340 });
2341 }
2342
2343 #[test_traced]
2344 fn test_journal_item_exactly_page_size() {
2345 const ITEM_SIZE: usize = PAGE_SIZE.get() as usize;
2348 type ExactItem = [u8; ITEM_SIZE];
2349
2350 let executor = deterministic::Runner::default();
2351 executor.start(|context| async move {
2352 let cfg = Config {
2353 partition: "test_partition".into(),
2354 compression: None,
2355 codec_config: (),
2356 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2357 write_buffer: NZUsize!(4096),
2358 };
2359 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2360 .await
2361 .expect("Failed to initialize journal");
2362
2363 let mut exact_data: ExactItem = [0u8; ITEM_SIZE];
2365 for (i, byte) in exact_data.iter_mut().enumerate() {
2366 *byte = (i % 256) as u8;
2367 }
2368
2369 let (offset, size) = journal
2371 .append(1, exact_data)
2372 .await
2373 .expect("Failed to append exact item");
2374 assert_eq!(size as usize, ITEM_SIZE);
2375 journal.sync(1).await.expect("Failed to sync");
2376
2377 let retrieved: ExactItem = journal
2379 .get(1, offset)
2380 .await
2381 .expect("Failed to get exact item");
2382 assert_eq!(retrieved, exact_data, "Random access read mismatch");
2383
2384 drop(journal);
2386 let journal = Journal::<_, ExactItem>::init(context.with_label("second"), cfg.clone())
2387 .await
2388 .expect("Failed to re-initialize journal");
2389
2390 {
2392 let stream = journal
2393 .replay(0, 0, NZUsize!(1024))
2394 .await
2395 .expect("Failed to setup replay");
2396 pin_mut!(stream);
2397
2398 let mut items = Vec::new();
2399 while let Some(result) = stream.next().await {
2400 let (section, off, sz, item) = result.expect("Failed to replay item");
2401 items.push((section, off, sz, item));
2402 }
2403
2404 assert_eq!(items.len(), 1, "Should have exactly one item");
2405 let (section, off, sz, item) = &items[0];
2406 assert_eq!(*section, 1);
2407 assert_eq!(*off, offset);
2408 assert_eq!(*sz as usize, ITEM_SIZE);
2409 assert_eq!(*item, exact_data, "Replay read mismatch");
2410 }
2411
2412 journal.destroy().await.unwrap();
2413 });
2414 }
2415
2416 #[test_traced]
2417 fn test_journal_varint_spanning_page_boundary() {
2418 const SMALL_PAGE: NonZeroU16 = NZU16!(16);
2426
2427 let executor = deterministic::Runner::default();
2428 executor.start(|context| async move {
2429 let cfg = Config {
2430 partition: "test_partition".into(),
2431 compression: None,
2432 codec_config: (),
2433 page_cache: CacheRef::new(SMALL_PAGE, PAGE_CACHE_SIZE),
2434 write_buffer: NZUsize!(1024),
2435 };
2436 let mut journal: Journal<_, [u8; 128]> =
2437 Journal::init(context.with_label("first"), cfg.clone())
2438 .await
2439 .expect("Failed to initialize journal");
2440
2441 let item1: [u8; 128] = [1u8; 128];
2443 let item2: [u8; 128] = [2u8; 128];
2444 let item3: [u8; 128] = [3u8; 128];
2445
2446 let (offset1, _) = journal.append(1, item1).await.expect("Failed to append");
2449 let (offset2, _) = journal.append(1, item2).await.expect("Failed to append");
2450 let (offset3, _) = journal.append(1, item3).await.expect("Failed to append");
2451
2452 journal.sync(1).await.expect("Failed to sync");
2453
2454 let retrieved1: [u8; 128] = journal.get(1, offset1).await.expect("Failed to get");
2456 let retrieved2: [u8; 128] = journal.get(1, offset2).await.expect("Failed to get");
2457 let retrieved3: [u8; 128] = journal.get(1, offset3).await.expect("Failed to get");
2458 assert_eq!(retrieved1, item1);
2459 assert_eq!(retrieved2, item2);
2460 assert_eq!(retrieved3, item3);
2461
2462 drop(journal);
2464 let journal: Journal<_, [u8; 128]> =
2465 Journal::init(context.with_label("second"), cfg.clone())
2466 .await
2467 .expect("Failed to re-initialize journal");
2468
2469 {
2471 let stream = journal
2472 .replay(0, 0, NZUsize!(64))
2473 .await
2474 .expect("Failed to setup replay");
2475 pin_mut!(stream);
2476
2477 let mut items = Vec::new();
2478 while let Some(result) = stream.next().await {
2479 let (section, off, _, item) = result.expect("Failed to replay item");
2480 items.push((section, off, item));
2481 }
2482
2483 assert_eq!(items.len(), 3, "Should have 3 items");
2484 assert_eq!(items[0], (1, offset1, item1));
2485 assert_eq!(items[1], (1, offset2, item2));
2486 assert_eq!(items[2], (1, offset3, item3));
2487 }
2488
2489 journal.destroy().await.unwrap();
2490 });
2491 }
2492
2493 #[test_traced]
2494 fn test_journal_clear() {
2495 let executor = deterministic::Runner::default();
2496 executor.start(|context| async move {
2497 let cfg = Config {
2498 partition: "clear_test".into(),
2499 compression: None,
2500 codec_config: (),
2501 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2502 write_buffer: NZUsize!(1024),
2503 };
2504
2505 let mut journal: Journal<_, u64> =
2506 Journal::init(context.with_label("journal"), cfg.clone())
2507 .await
2508 .expect("Failed to initialize journal");
2509
2510 for section in 0..5u64 {
2512 for i in 0..10u64 {
2513 journal
2514 .append(section, section * 1000 + i)
2515 .await
2516 .expect("Failed to append");
2517 }
2518 journal.sync(section).await.expect("Failed to sync");
2519 }
2520
2521 assert_eq!(journal.get(0, 0).await.unwrap(), 0);
2523 assert_eq!(journal.get(4, 0).await.unwrap(), 4000);
2524
2525 journal.clear().await.expect("Failed to clear");
2527
2528 for section in 0..5u64 {
2530 assert!(matches!(
2531 journal.get(section, 0).await,
2532 Err(Error::SectionOutOfRange(s)) if s == section
2533 ));
2534 }
2535
2536 for i in 0..5u64 {
2538 journal
2539 .append(10, i * 100)
2540 .await
2541 .expect("Failed to append after clear");
2542 }
2543 journal.sync(10).await.expect("Failed to sync after clear");
2544
2545 assert_eq!(journal.get(10, 0).await.unwrap(), 0);
2547
2548 assert!(matches!(
2550 journal.get(0, 0).await,
2551 Err(Error::SectionOutOfRange(0))
2552 ));
2553
2554 journal.destroy().await.unwrap();
2555 });
2556 }
2557}