1use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
84use crate::journal::Error;
85use commonware_codec::{
86 varint::{UInt, MAX_U32_VARINT_SIZE},
87 Codec, CodecShared, EncodeSize, ReadExt, Write as CodecWrite,
88};
89use commonware_runtime::{
90 buffer::paged::{Append, CacheRef, Replay},
91 Blob, Buf, IoBuf, IoBufMut, Metrics, Storage,
92};
93use futures::stream::{self, Stream, StreamExt};
94use std::{io::Cursor, num::NonZeroUsize};
95use tracing::{trace, warn};
96use zstd::{bulk::compress, decode_all};
97
98#[derive(Clone)]
100pub struct Config<C> {
101 pub partition: String,
104
105 pub compression: Option<u8>,
107
108 pub codec_config: C,
110
111 pub page_cache: CacheRef,
113
114 pub write_buffer: NonZeroUsize,
116}
117
118#[inline]
121fn decode_length_prefix(buf: &mut impl Buf) -> Result<(usize, usize), Error> {
122 let initial = buf.remaining();
123 let size = UInt::<u32>::read(buf)?.0 as usize;
124 let varint_len = initial - buf.remaining();
125 Ok((size, varint_len))
126}
127
128enum ItemInfo {
130 Complete {
132 varint_len: usize,
134 data_len: usize,
136 },
137 Incomplete {
139 varint_len: usize,
141 prefix_len: usize,
143 total_len: usize,
145 },
146}
147
148fn find_item(buf: &mut impl Buf, offset: u64) -> Result<(u64, ItemInfo), Error> {
152 let available = buf.remaining();
153 let (size, varint_len) = decode_length_prefix(buf)?;
154 let next_offset = offset
155 .checked_add(varint_len as u64)
156 .ok_or(Error::OffsetOverflow)?
157 .checked_add(size as u64)
158 .ok_or(Error::OffsetOverflow)?;
159 let buffered = available.saturating_sub(varint_len);
160
161 let item = if buffered >= size {
162 ItemInfo::Complete {
163 varint_len,
164 data_len: size,
165 }
166 } else {
167 ItemInfo::Incomplete {
168 varint_len,
169 prefix_len: buffered,
170 total_len: size,
171 }
172 };
173
174 Ok((next_offset, item))
175}
176
177struct ReplayState<B: Blob, C> {
179 section: u64,
180 blob: Append<B>,
181 replay: Replay<B>,
182 skip_bytes: u64,
183 offset: u64,
184 valid_offset: u64,
185 codec_config: C,
186 compressed: bool,
187 done: bool,
188}
189
190fn decode_item<V: Codec>(item_data: impl Buf, cfg: &V::Cfg, compressed: bool) -> Result<V, Error> {
192 if compressed {
193 let decompressed =
194 decode_all(item_data.reader()).map_err(|_| Error::DecompressionFailed)?;
195 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)
196 } else {
197 V::decode_cfg(item_data, cfg).map_err(Error::Codec)
198 }
199}
200
201pub struct Journal<E: Storage + Metrics, V: Codec> {
215 manager: Manager<E, AppendFactory>,
216
217 compression: Option<u8>,
219
220 codec_config: V::Cfg,
222}
223
224impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
225 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
231 let manager_cfg = ManagerConfig {
232 partition: cfg.partition,
233 factory: AppendFactory {
234 write_buffer: cfg.write_buffer,
235 page_cache_ref: cfg.page_cache,
236 },
237 };
238 let manager = Manager::init(context, manager_cfg).await?;
239
240 Ok(Self {
241 manager,
242 compression: cfg.compression,
243 codec_config: cfg.codec_config,
244 })
245 }
246
247 async fn read(
249 compressed: bool,
250 cfg: &V::Cfg,
251 blob: &Append<E::Blob>,
252 offset: u64,
253 ) -> Result<(u64, u32, V), Error> {
254 let (buf, available) = blob
256 .read_up_to(
257 offset,
258 MAX_U32_VARINT_SIZE,
259 IoBufMut::with_capacity(MAX_U32_VARINT_SIZE),
260 )
261 .await?;
262 let buf = buf.freeze();
263 let mut cursor = Cursor::new(buf.slice(..available));
264 let (next_offset, item_info) = find_item(&mut cursor, offset)?;
265
266 let (item_size, decoded) = match item_info {
268 ItemInfo::Complete {
269 varint_len,
270 data_len,
271 } => {
272 let data = buf.slice(varint_len..varint_len + data_len);
274 let decoded = decode_item::<V>(data, cfg, compressed)?;
275 (data_len as u32, decoded)
276 }
277 ItemInfo::Incomplete {
278 varint_len,
279 prefix_len,
280 total_len,
281 } => {
282 let prefix = buf.slice(varint_len..varint_len + prefix_len);
284 let read_offset = offset + varint_len as u64 + prefix_len as u64;
285 let remainder_len = total_len - prefix_len;
286 let mut remainder = vec![0u8; remainder_len];
287 blob.read_into(&mut remainder, read_offset).await?;
288 let chained = prefix.chain(IoBuf::from(remainder));
289 let decoded = decode_item::<V>(chained, cfg, compressed)?;
290 (total_len as u32, decoded)
291 }
292 };
293
294 Ok((next_offset, item_size, decoded))
295 }
296
297 pub async fn replay(
301 &self,
302 start_section: u64,
303 mut start_offset: u64,
304 buffer: NonZeroUsize,
305 ) -> Result<impl Stream<Item = Result<(u64, u64, u32, V), Error>> + Send + '_, Error> {
306 let codec_config = self.codec_config.clone();
308 let compressed = self.compression.is_some();
309 let mut blobs = Vec::new();
310 for (§ion, blob) in self.manager.sections_from(start_section) {
311 blobs.push((
312 section,
313 blob.clone(),
314 blob.replay(buffer).await?,
315 codec_config.clone(),
316 compressed,
317 ));
318 }
319
320 Ok(stream::iter(blobs).flat_map(
322 move |(section, blob, replay, codec_config, compressed)| {
323 let skip_bytes = if section == start_section {
325 start_offset
326 } else {
327 start_offset = 0;
328 0
329 };
330
331 stream::unfold(
332 ReplayState {
333 section,
334 blob,
335 replay,
336 skip_bytes,
337 offset: 0,
338 valid_offset: skip_bytes,
339 codec_config,
340 compressed,
341 done: false,
342 },
343 move |mut state| async move {
344 if state.done {
345 return None;
346 }
347
348 let blob_size = state.replay.blob_size();
349 let mut batch: Vec<Result<(u64, u64, u32, V), Error>> = Vec::new();
350 loop {
351 match state.replay.ensure(MAX_U32_VARINT_SIZE).await {
355 Ok(true) => {}
356 Ok(false) => {
357 if state.replay.remaining() == 0 {
359 state.done = true;
360 return if batch.is_empty() {
361 None
362 } else {
363 Some((batch, state))
364 };
365 }
366 }
368 Err(err) => {
369 batch.push(Err(err.into()));
370 state.done = true;
371 return Some((batch, state));
372 }
373 }
374
375 if state.skip_bytes > 0 {
377 let to_skip =
378 state.skip_bytes.min(state.replay.remaining() as u64) as usize;
379 state.replay.advance(to_skip);
380 state.skip_bytes -= to_skip as u64;
381 state.offset += to_skip as u64;
382 continue;
383 }
384
385 let before_remaining = state.replay.remaining();
387 let (item_size, varint_len) =
388 match decode_length_prefix(&mut state.replay) {
389 Ok(result) => result,
390 Err(err) => {
391 if state.replay.is_exhausted()
393 || before_remaining < MAX_U32_VARINT_SIZE
394 {
395 if state.valid_offset < blob_size
397 && state.offset < blob_size
398 {
399 warn!(
400 blob = state.section,
401 bad_offset = state.offset,
402 new_size = state.valid_offset,
403 "trailing bytes detected: truncating"
404 );
405 if let Err(err) =
406 state.blob.resize(state.valid_offset).await
407 {
408 batch.push(Err(err.into()));
409 state.done = true;
410 return Some((batch, state));
411 }
412 }
413 state.done = true;
414 return if batch.is_empty() {
415 None
416 } else {
417 Some((batch, state))
418 };
419 }
420 batch.push(Err(err));
421 state.done = true;
422 return Some((batch, state));
423 }
424 };
425
426 match state.replay.ensure(item_size).await {
428 Ok(true) => {}
429 Ok(false) => {
430 warn!(
432 blob = state.section,
433 bad_offset = state.offset,
434 new_size = state.valid_offset,
435 "incomplete item at end: truncating"
436 );
437 if let Err(err) = state.blob.resize(state.valid_offset).await {
438 batch.push(Err(err.into()));
439 state.done = true;
440 return Some((batch, state));
441 }
442 state.done = true;
443 return if batch.is_empty() {
444 None
445 } else {
446 Some((batch, state))
447 };
448 }
449 Err(err) => {
450 batch.push(Err(err.into()));
451 state.done = true;
452 return Some((batch, state));
453 }
454 }
455
456 let item_offset = state.offset;
458 let next_offset = match state
459 .offset
460 .checked_add(varint_len as u64)
461 .and_then(|o| o.checked_add(item_size as u64))
462 {
463 Some(o) => o,
464 None => {
465 batch.push(Err(Error::OffsetOverflow));
466 state.done = true;
467 return Some((batch, state));
468 }
469 };
470 match decode_item::<V>(
471 (&mut state.replay).take(item_size),
472 &state.codec_config,
473 state.compressed,
474 ) {
475 Ok(decoded) => {
476 batch.push(Ok((
477 state.section,
478 item_offset,
479 item_size as u32,
480 decoded,
481 )));
482 state.valid_offset = next_offset;
483 state.offset = next_offset;
484 }
485 Err(err) => {
486 batch.push(Err(err));
487 state.done = true;
488 return Some((batch, state));
489 }
490 }
491
492 if !batch.is_empty() && state.replay.remaining() < MAX_U32_VARINT_SIZE {
494 return Some((batch, state));
495 }
496 }
497 },
498 )
499 .flat_map(stream::iter)
500 },
501 ))
502 }
503
504 pub(crate) fn encode_item(compression: Option<u8>, item: &V) -> Result<(Vec<u8>, u32), Error> {
509 let mut buf = Vec::new();
510 let item_len = Self::encode_item_into(compression, item, &mut buf)?;
511 Ok((buf, item_len))
512 }
513
514 pub(crate) fn encode_item_into(
521 compression: Option<u8>,
522 item: &V,
523 buf: &mut Vec<u8>,
524 ) -> Result<u32, Error> {
525 if let Some(compression) = compression {
526 let encoded = item.encode();
528 let compressed =
529 compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?;
530 let item_len = compressed.len();
531 let item_len_u32: u32 = match item_len.try_into() {
532 Ok(len) => len,
533 Err(_) => return Err(Error::ItemTooLarge(item_len)),
534 };
535 let size_len = UInt(item_len_u32).encode_size();
536 let entry_len = size_len
537 .checked_add(item_len)
538 .ok_or(Error::OffsetOverflow)?;
539
540 buf.reserve(entry_len);
541 UInt(item_len_u32).write(buf);
542 buf.extend_from_slice(&compressed);
543
544 Ok(item_len_u32)
545 } else {
546 let item_len = item.encode_size();
548 let item_len_u32: u32 = match item_len.try_into() {
549 Ok(len) => len,
550 Err(_) => return Err(Error::ItemTooLarge(item_len)),
551 };
552 let size_len = UInt(item_len_u32).encode_size();
553 let entry_len = size_len
554 .checked_add(item_len)
555 .ok_or(Error::OffsetOverflow)?;
556
557 buf.reserve(entry_len);
558 UInt(item_len_u32).write(buf);
559 item.write(buf);
560
561 Ok(item_len_u32)
562 }
563 }
564
565 pub async fn append(&mut self, section: u64, item: &V) -> Result<(u64, u32), Error> {
569 let (buf, item_len) = Self::encode_item(self.compression, item)?;
570 self.append_raw(section, &buf)
571 .await
572 .map(|offset| (offset, item_len))
573 }
574
575 pub(crate) async fn append_raw(&mut self, section: u64, buf: &[u8]) -> Result<u64, Error> {
580 let blob = self.manager.get_or_create(section).await?;
581 let offset = blob.size().await;
582 blob.append(buf).await?;
583 trace!(blob = section, offset, "appended item");
584 Ok(offset)
585 }
586
587 pub async fn get(&self, section: u64, offset: u64) -> Result<V, Error> {
598 let blob = self
599 .manager
600 .get(section)?
601 .ok_or(Error::SectionOutOfRange(section))?;
602
603 let (_, _, item) =
605 Self::read(self.compression.is_some(), &self.codec_config, blob, offset).await?;
606 Ok(item)
607 }
608
609 pub async fn get_many(&self, section: u64, offsets: &[u64]) -> Result<Vec<V>, Error> {
613 if offsets.is_empty() {
614 return Ok(Vec::new());
615 }
616 let blob = self
617 .manager
618 .get(section)?
619 .ok_or(Error::SectionOutOfRange(section))?;
620
621 let compressed = self.compression.is_some();
622 let cfg = &self.codec_config;
623 let mut items = Vec::with_capacity(offsets.len());
624 for &offset in offsets {
625 let (_, _, item) = Self::read(compressed, cfg, blob, offset).await?;
626 items.push(item);
627 }
628 Ok(items)
629 }
630
631 pub(crate) async fn get_many_consecutive(
644 &self,
645 section: u64,
646 offsets: &[u64],
647 ) -> Result<Vec<V>, Error> {
648 if offsets.len() <= 1 {
649 return self.get_many(section, offsets).await;
650 }
651 let blob = self
652 .manager
653 .get(section)?
654 .ok_or(Error::SectionOutOfRange(section))?;
655
656 let start = offsets[0];
657 let end = offsets[offsets.len() - 1];
658 if end <= start {
659 return self.get_many(section, offsets).await;
660 }
661 let range_len = usize::try_from(end - start).map_err(|_| Error::OffsetOverflow)?;
662 let bytes = blob.read_at(start, range_len).await?.coalesce();
663 let bytes = bytes.as_ref();
664
665 let compressed = self.compression.is_some();
666 let cfg = &self.codec_config;
667 let mut items = Vec::with_capacity(offsets.len());
668 let mut local_offset = 0usize;
669
670 for window in offsets.windows(2) {
671 let offset = window[0];
672 let next_offset = window[1];
673 assert!(offset < next_offset, "offsets must be strictly increasing");
674
675 let item_len =
676 usize::try_from(next_offset - offset).map_err(|_| Error::OffsetOverflow)?;
677
678 let mut cursor = Cursor::new(&bytes[local_offset..]);
679 let (size, varint_len) = decode_length_prefix(&mut cursor)?;
680 let actual_len = size + varint_len;
681 if actual_len != item_len {
682 return Err(Error::OffsetDataMismatch {
683 section,
684 offset,
685 expected_len: item_len,
686 actual_len,
687 });
688 }
689
690 let data_start = local_offset
691 .checked_add(varint_len)
692 .ok_or(Error::OffsetOverflow)?;
693 let data_end = local_offset
694 .checked_add(item_len)
695 .ok_or(Error::OffsetOverflow)?;
696
697 items.push(decode_item::<V>(
698 &bytes[data_start..data_end],
699 cfg,
700 compressed,
701 )?);
702
703 local_offset = data_end;
704 }
705
706 let (_, _, item) = Self::read(compressed, cfg, blob, end).await?;
707 items.push(item);
708 Ok(items)
709 }
710
711 pub fn try_get_sync(&self, section: u64, offset: u64) -> Option<V> {
713 let mut buf = Vec::new();
714 self.try_get_sync_into(section, offset, &mut buf)
715 }
716
717 pub fn try_get_sync_into(&self, section: u64, offset: u64, buf: &mut Vec<u8>) -> Option<V> {
719 let blob = self.manager.get(section).ok()??;
720 let remaining = blob.try_size()?.checked_sub(offset)?;
721 let header_len = usize::try_from(remaining.min(MAX_U32_VARINT_SIZE as u64)).ok()?;
722 if header_len == 0 {
723 return None;
724 }
725
726 let mut header = [0u8; MAX_U32_VARINT_SIZE];
728 if !blob.try_read_sync(offset, &mut header[..header_len]) {
729 return None;
730 }
731 let mut cursor = Cursor::new(&header[..header_len]);
732 let (_, item_info) = find_item(&mut cursor, offset).ok()?;
733
734 let (varint_len, data_len) = match item_info {
735 ItemInfo::Complete {
736 varint_len,
737 data_len,
738 } => (varint_len, data_len),
739 ItemInfo::Incomplete {
740 varint_len,
741 total_len,
742 ..
743 } => (varint_len, total_len),
744 };
745 let item_len = varint_len.checked_add(data_len)?;
746 if item_len > usize::try_from(remaining).ok()? {
747 return None;
748 }
749
750 if item_len <= header_len {
752 return decode_item::<V>(
753 &header[varint_len..varint_len + data_len],
754 &self.codec_config,
755 self.compression.is_some(),
756 )
757 .ok();
758 }
759
760 buf.resize(item_len, 0);
762 if !blob.try_read_sync(offset, buf) {
763 return None;
764 }
765 decode_item::<V>(
766 &buf[varint_len..varint_len + data_len],
767 &self.codec_config,
768 self.compression.is_some(),
769 )
770 .ok()
771 }
772
773 pub async fn size(&self, section: u64) -> Result<u64, Error> {
777 self.manager.size(section).await
778 }
779
780 pub async fn rewind_to_offset(&mut self, section: u64, offset: u64) -> Result<(), Error> {
788 self.manager.rewind(section, offset).await
789 }
790
791 pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
801 self.manager.rewind(section, size).await
802 }
803
804 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
812 self.manager.rewind_section(section, size).await
813 }
814
815 pub async fn sync(&self, section: u64) -> Result<(), Error> {
819 self.manager.sync(section).await
820 }
821
822 pub async fn sync_all(&self) -> Result<(), Error> {
824 self.manager.sync_all().await
825 }
826
827 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
829 self.manager.prune(min).await
830 }
831
832 pub fn oldest_section(&self) -> Option<u64> {
834 self.manager.oldest_section()
835 }
836
837 pub fn newest_section(&self) -> Option<u64> {
839 self.manager.newest_section()
840 }
841
842 pub fn is_empty(&self) -> bool {
844 self.manager.is_empty()
845 }
846
847 pub fn num_sections(&self) -> usize {
849 self.manager.num_sections()
850 }
851
852 pub async fn destroy(self) -> Result<(), Error> {
854 self.manager.destroy().await
855 }
856
857 pub async fn clear(&mut self) -> Result<(), Error> {
861 self.manager.clear().await
862 }
863}
864
865#[cfg(test)]
866mod tests {
867 use super::*;
868 use commonware_macros::test_traced;
869 use commonware_runtime::{deterministic, Blob, BufMut, Runner, Storage, Supervisor as _};
870 use commonware_utils::{NZUsize, NZU16};
871 use futures::{pin_mut, StreamExt};
872 use std::num::NonZeroU16;
873
874 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
875 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
876
877 #[test_traced]
878 fn test_journal_append_and_read() {
879 let executor = deterministic::Runner::default();
881
882 executor.start(|context| async move {
884 let cfg = Config {
886 partition: "test-partition".into(),
887 compression: None,
888 codec_config: (),
889 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
890 write_buffer: NZUsize!(1024),
891 };
892 let index = 1u64;
893 let data = 10;
894 let mut journal = Journal::init(context.child("first"), cfg.clone())
895 .await
896 .expect("Failed to initialize journal");
897
898 journal
900 .append(index, &data)
901 .await
902 .expect("Failed to append data");
903
904 let buffer = context.encode();
906 assert!(buffer.contains("first_tracked 1"));
907
908 journal.sync(index).await.expect("Failed to sync journal");
910 drop(journal);
911 let journal = Journal::<_, i32>::init(context.child("second"), cfg)
912 .await
913 .expect("Failed to re-initialize journal");
914
915 let mut items = Vec::new();
917 let stream = journal
918 .replay(0, 0, NZUsize!(1024))
919 .await
920 .expect("unable to setup replay");
921 pin_mut!(stream);
922 while let Some(result) = stream.next().await {
923 match result {
924 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
925 Err(err) => panic!("Failed to read item: {err}"),
926 }
927 }
928
929 assert_eq!(items.len(), 1);
931 assert_eq!(items[0].0, index);
932 assert_eq!(items[0].1, data);
933
934 let buffer = context.encode();
936 assert!(buffer.contains("second_tracked 1"));
937 });
938 }
939
940 #[test_traced]
941 fn test_journal_multiple_appends_and_reads() {
942 let executor = deterministic::Runner::default();
944
945 executor.start(|context| async move {
947 let cfg = Config {
949 partition: "test-partition".into(),
950 compression: None,
951 codec_config: (),
952 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
953 write_buffer: NZUsize!(1024),
954 };
955
956 let mut journal = Journal::init(context.child("first"), cfg.clone())
958 .await
959 .expect("Failed to initialize journal");
960
961 let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
963 for (index, data) in &data_items {
964 journal
965 .append(*index, data)
966 .await
967 .expect("Failed to append data");
968 journal.sync(*index).await.expect("Failed to sync blob");
969 }
970
971 let buffer = context.encode();
973 assert!(buffer.contains("first_tracked 3"));
974 assert!(buffer.contains("first_synced_total 4"));
975
976 drop(journal);
978 let journal = Journal::init(context.child("second"), cfg)
979 .await
980 .expect("Failed to re-initialize journal");
981
982 let mut items = Vec::<(u64, u32)>::new();
984 {
985 let stream = journal
986 .replay(0, 0, NZUsize!(1024))
987 .await
988 .expect("unable to setup replay");
989 pin_mut!(stream);
990 while let Some(result) = stream.next().await {
991 match result {
992 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
993 Err(err) => panic!("Failed to read item: {err}"),
994 }
995 }
996 }
997
998 assert_eq!(items.len(), data_items.len());
1000 for ((expected_index, expected_data), (actual_index, actual_data)) in
1001 data_items.iter().zip(items.iter())
1002 {
1003 assert_eq!(actual_index, expected_index);
1004 assert_eq!(actual_data, expected_data);
1005 }
1006
1007 journal.destroy().await.expect("Failed to destroy journal");
1009 });
1010 }
1011
1012 #[test_traced]
1013 fn test_journal_prune_blobs() {
1014 let executor = deterministic::Runner::default();
1016
1017 executor.start(|context| async move {
1019 let cfg = Config {
1021 partition: "test-partition".into(),
1022 compression: None,
1023 codec_config: (),
1024 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1025 write_buffer: NZUsize!(1024),
1026 };
1027
1028 let mut journal = Journal::init(context.child("first"), cfg.clone())
1030 .await
1031 .expect("Failed to initialize journal");
1032
1033 for index in 1u64..=5u64 {
1035 journal
1036 .append(index, &index)
1037 .await
1038 .expect("Failed to append data");
1039 journal.sync(index).await.expect("Failed to sync blob");
1040 }
1041
1042 let data = 99;
1044 journal
1045 .append(2u64, &data)
1046 .await
1047 .expect("Failed to append data");
1048 journal.sync(2u64).await.expect("Failed to sync blob");
1049
1050 journal.prune(3).await.expect("Failed to prune blobs");
1052
1053 let buffer = context.encode();
1055 assert!(buffer.contains("first_pruned_total 2"));
1056
1057 journal.prune(2).await.expect("Failed to no-op prune");
1059 let buffer = context.encode();
1060 assert!(buffer.contains("first_pruned_total 2"));
1061
1062 drop(journal);
1064 let mut journal = Journal::init(context.child("second"), cfg.clone())
1065 .await
1066 .expect("Failed to re-initialize journal");
1067
1068 let mut items = Vec::<(u64, u64)>::new();
1070 {
1071 let stream = journal
1072 .replay(0, 0, NZUsize!(1024))
1073 .await
1074 .expect("unable to setup replay");
1075 pin_mut!(stream);
1076 while let Some(result) = stream.next().await {
1077 match result {
1078 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1079 Err(err) => panic!("Failed to read item: {err}"),
1080 }
1081 }
1082 }
1083
1084 assert_eq!(items.len(), 3);
1086 let expected_indices = [3u64, 4u64, 5u64];
1087 for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
1088 assert_eq!(item.0, *expected_index);
1089 }
1090
1091 journal.prune(6).await.expect("Failed to prune blobs");
1093
1094 drop(journal);
1096
1097 assert!(context
1102 .scan(&cfg.partition)
1103 .await
1104 .expect("Failed to list blobs")
1105 .is_empty());
1106 });
1107 }
1108
1109 #[test_traced]
1110 fn test_journal_prune_guard() {
1111 let executor = deterministic::Runner::default();
1112
1113 executor.start(|context| async move {
1114 let cfg = Config {
1115 partition: "test-partition".into(),
1116 compression: None,
1117 codec_config: (),
1118 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1119 write_buffer: NZUsize!(1024),
1120 };
1121
1122 let mut journal = Journal::init(context.child("storage"), cfg.clone())
1123 .await
1124 .expect("Failed to initialize journal");
1125
1126 for section in 1u64..=5u64 {
1128 journal
1129 .append(section, &(section as i32))
1130 .await
1131 .expect("Failed to append data");
1132 journal.sync(section).await.expect("Failed to sync");
1133 }
1134
1135 journal.prune(3).await.expect("Failed to prune");
1137
1138 match journal.append(1, &100).await {
1142 Err(Error::AlreadyPrunedToSection(3)) => {}
1143 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1144 }
1145
1146 match journal.append(2, &100).await {
1147 Err(Error::AlreadyPrunedToSection(3)) => {}
1148 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1149 }
1150
1151 match journal.get(1, 0).await {
1153 Err(Error::AlreadyPrunedToSection(3)) => {}
1154 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1155 }
1156
1157 match journal.size(1).await {
1159 Err(Error::AlreadyPrunedToSection(3)) => {}
1160 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1161 }
1162
1163 match journal.rewind(2, 0).await {
1165 Err(Error::AlreadyPrunedToSection(3)) => {}
1166 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1167 }
1168
1169 match journal.rewind_section(1, 0).await {
1171 Err(Error::AlreadyPrunedToSection(3)) => {}
1172 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1173 }
1174
1175 match journal.sync(2).await {
1177 Err(Error::AlreadyPrunedToSection(3)) => {}
1178 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1179 }
1180
1181 assert!(journal.get(3, 0).await.is_ok());
1183 assert!(journal.get(4, 0).await.is_ok());
1184 assert!(journal.get(5, 0).await.is_ok());
1185 assert!(journal.size(3).await.is_ok());
1186 assert!(journal.sync(4).await.is_ok());
1187
1188 journal
1190 .append(3, &999)
1191 .await
1192 .expect("Should be able to append to section 3");
1193
1194 journal.prune(5).await.expect("Failed to prune");
1196
1197 match journal.get(3, 0).await {
1199 Err(Error::AlreadyPrunedToSection(5)) => {}
1200 other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1201 }
1202
1203 match journal.get(4, 0).await {
1204 Err(Error::AlreadyPrunedToSection(5)) => {}
1205 other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1206 }
1207
1208 assert!(journal.get(5, 0).await.is_ok());
1210 });
1211 }
1212
1213 #[test_traced]
1214 fn test_journal_prune_guard_across_restart() {
1215 let executor = deterministic::Runner::default();
1216
1217 executor.start(|context| async move {
1218 let cfg = Config {
1219 partition: "test-partition".into(),
1220 compression: None,
1221 codec_config: (),
1222 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1223 write_buffer: NZUsize!(1024),
1224 };
1225
1226 {
1228 let mut journal = Journal::init(context.child("first"), cfg.clone())
1229 .await
1230 .expect("Failed to initialize journal");
1231
1232 for section in 1u64..=5u64 {
1233 journal
1234 .append(section, &(section as i32))
1235 .await
1236 .expect("Failed to append data");
1237 journal.sync(section).await.expect("Failed to sync");
1238 }
1239
1240 journal.prune(3).await.expect("Failed to prune");
1241 }
1242
1243 {
1245 let journal = Journal::<_, i32>::init(context.child("second"), cfg.clone())
1246 .await
1247 .expect("Failed to re-initialize journal");
1248
1249 match journal.get(1, 0).await {
1252 Err(Error::SectionOutOfRange(1)) => {}
1253 other => panic!("Expected SectionOutOfRange(1), got {other:?}"),
1254 }
1255
1256 match journal.get(2, 0).await {
1257 Err(Error::SectionOutOfRange(2)) => {}
1258 other => panic!("Expected SectionOutOfRange(2), got {other:?}"),
1259 }
1260
1261 assert!(journal.get(3, 0).await.is_ok());
1263 assert!(journal.get(4, 0).await.is_ok());
1264 assert!(journal.get(5, 0).await.is_ok());
1265 }
1266 });
1267 }
1268
1269 #[test_traced]
1270 fn test_journal_with_invalid_blob_name() {
1271 let executor = deterministic::Runner::default();
1273
1274 executor.start(|context| async move {
1276 let cfg = Config {
1278 partition: "test-partition".into(),
1279 compression: None,
1280 codec_config: (),
1281 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1282 write_buffer: NZUsize!(1024),
1283 };
1284
1285 let invalid_blob_name = b"invalid"; let (blob, _) = context
1288 .open(&cfg.partition, invalid_blob_name)
1289 .await
1290 .expect("Failed to create blob with invalid name");
1291 blob.sync().await.expect("Failed to sync blob");
1292
1293 let result = Journal::<_, u64>::init(context, cfg).await;
1295
1296 assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1298 });
1299 }
1300
1301 #[test_traced]
1302 fn test_journal_read_size_missing() {
1303 let executor = deterministic::Runner::default();
1305
1306 executor.start(|context| async move {
1308 let cfg = Config {
1310 partition: "test-partition".into(),
1311 compression: None,
1312 codec_config: (),
1313 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1314 write_buffer: NZUsize!(1024),
1315 };
1316
1317 let section = 1u64;
1319 let blob_name = section.to_be_bytes();
1320 let (blob, _) = context
1321 .open(&cfg.partition, &blob_name)
1322 .await
1323 .expect("Failed to create blob");
1324
1325 let mut incomplete_data = Vec::new();
1327 UInt(u32::MAX).write(&mut incomplete_data);
1328 incomplete_data.truncate(1);
1329 blob.write_at_sync(0, incomplete_data)
1330 .await
1331 .expect("Failed to write incomplete data");
1332
1333 let journal = Journal::init(context, cfg)
1335 .await
1336 .expect("Failed to initialize journal");
1337
1338 let stream = journal
1340 .replay(0, 0, NZUsize!(1024))
1341 .await
1342 .expect("unable to setup replay");
1343 pin_mut!(stream);
1344 let mut items = Vec::<(u64, u64)>::new();
1345 while let Some(result) = stream.next().await {
1346 match result {
1347 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1348 Err(err) => panic!("Failed to read item: {err}"),
1349 }
1350 }
1351 assert!(items.is_empty());
1352 });
1353 }
1354
1355 #[test_traced]
1356 fn test_journal_replay_reports_resize_error_on_trailing_bytes() {
1357 let executor = deterministic::Runner::default();
1358 executor.start(|context| async move {
1359 let cfg = Config {
1360 partition: "test-partition".into(),
1361 compression: None,
1362 codec_config: (),
1363 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1364 write_buffer: NZUsize!(1024),
1365 };
1366
1367 let section = 1u64;
1370 let item = [10u8; 1021];
1371 let item_record_size =
1372 UInt(item.encode_size() as u32).encode_size() + item.encode_size();
1373 assert_eq!(item_record_size, PAGE_SIZE.get() as usize - 1);
1374
1375 let mut journal = Journal::init(context.child("first"), cfg.clone())
1376 .await
1377 .expect("Failed to initialize journal");
1378 journal
1379 .append(section, &item)
1380 .await
1381 .expect("Failed to append item");
1382 journal
1383 .append_raw(section, &[0xFF, 0xFF])
1384 .await
1385 .expect("Failed to append trailing bytes");
1386 journal.sync(section).await.expect("Failed to sync journal");
1387 drop(journal);
1388
1389 let journal = Journal::init(context.child("second"), cfg)
1390 .await
1391 .expect("Failed to re-initialize journal");
1392 *context.storage_fault_config().write() = deterministic::FaultConfig {
1393 resize_rate: Some(1.0),
1394 ..Default::default()
1395 };
1396
1397 let stream = journal
1398 .replay(0, 0, NZUsize!(1024))
1399 .await
1400 .expect("unable to setup replay");
1401 pin_mut!(stream);
1402
1403 let first = stream
1404 .next()
1405 .await
1406 .expect("expected item before trailing bytes")
1407 .expect("failed to replay valid item");
1408 assert_eq!(first, (section, 0, item.encode_size() as u32, item));
1409
1410 match stream.next().await {
1412 Some(Err(_)) => {}
1413 other => {
1414 panic!("expected resize error while repairing trailing bytes, got {other:?}")
1415 }
1416 }
1417 assert!(stream.next().await.is_none());
1418 });
1419 }
1420
1421 #[test_traced]
1422 fn test_journal_read_item_missing() {
1423 let executor = deterministic::Runner::default();
1425
1426 executor.start(|context| async move {
1428 let cfg = Config {
1430 partition: "test-partition".into(),
1431 compression: None,
1432 codec_config: (),
1433 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1434 write_buffer: NZUsize!(1024),
1435 };
1436
1437 let section = 1u64;
1439 let blob_name = section.to_be_bytes();
1440 let (blob, _) = context
1441 .open(&cfg.partition, &blob_name)
1442 .await
1443 .expect("Failed to create blob");
1444
1445 let item_size: u32 = 10; let mut buf = Vec::new();
1448 UInt(item_size).write(&mut buf); let data = [2u8; 5];
1450 BufMut::put_slice(&mut buf, &data);
1451 blob.write_at_sync(0, buf)
1452 .await
1453 .expect("Failed to write incomplete item");
1454
1455 let journal = Journal::init(context, cfg)
1457 .await
1458 .expect("Failed to initialize journal");
1459
1460 let stream = journal
1462 .replay(0, 0, NZUsize!(1024))
1463 .await
1464 .expect("unable to setup replay");
1465 pin_mut!(stream);
1466 let mut items = Vec::<(u64, u64)>::new();
1467 while let Some(result) = stream.next().await {
1468 match result {
1469 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1470 Err(err) => panic!("Failed to read item: {err}"),
1471 }
1472 }
1473 assert!(items.is_empty());
1474 });
1475 }
1476
1477 #[test_traced]
1478 fn test_journal_read_checksum_missing() {
1479 let executor = deterministic::Runner::default();
1481
1482 executor.start(|context| async move {
1484 let cfg = Config {
1486 partition: "test-partition".into(),
1487 compression: None,
1488 codec_config: (),
1489 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1490 write_buffer: NZUsize!(1024),
1491 };
1492
1493 let section = 1u64;
1495 let blob_name = section.to_be_bytes();
1496 let (blob, _) = context
1497 .open(&cfg.partition, &blob_name)
1498 .await
1499 .expect("Failed to create blob");
1500
1501 let item_data = b"Test data";
1503 let item_size = item_data.len() as u32;
1504
1505 let mut buf = Vec::new();
1507 UInt(item_size).write(&mut buf);
1508 BufMut::put_slice(&mut buf, item_data);
1509 blob.write_at_sync(0, buf)
1510 .await
1511 .expect("Failed to write item without checksum");
1512
1513 let journal = Journal::init(context, cfg)
1515 .await
1516 .expect("Failed to initialize journal");
1517
1518 let stream = journal
1522 .replay(0, 0, NZUsize!(1024))
1523 .await
1524 .expect("unable to setup replay");
1525 pin_mut!(stream);
1526 let mut items = Vec::<(u64, u64)>::new();
1527 while let Some(result) = stream.next().await {
1528 match result {
1529 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1530 Err(err) => panic!("Failed to read item: {err}"),
1531 }
1532 }
1533 assert!(items.is_empty());
1534 });
1535 }
1536
1537 #[test_traced]
1538 fn test_journal_read_checksum_mismatch() {
1539 let executor = deterministic::Runner::default();
1541
1542 executor.start(|context| async move {
1544 let cfg = Config {
1546 partition: "test-partition".into(),
1547 compression: None,
1548 codec_config: (),
1549 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1550 write_buffer: NZUsize!(1024),
1551 };
1552
1553 let section = 1u64;
1555 let blob_name = section.to_be_bytes();
1556 let (blob, _) = context
1557 .open(&cfg.partition, &blob_name)
1558 .await
1559 .expect("Failed to create blob");
1560
1561 let item_data = b"Test data";
1563 let item_size = item_data.len() as u32;
1564 let incorrect_checksum: u32 = 0xDEADBEEF;
1565
1566 let mut buf = Vec::new();
1568 UInt(item_size).write(&mut buf);
1569 BufMut::put_slice(&mut buf, item_data);
1570 buf.put_u32(incorrect_checksum);
1571 blob.write_at_sync(0, buf)
1572 .await
1573 .expect("Failed to write item with bad checksum");
1574
1575 let journal = Journal::init(context.child("storage"), cfg.clone())
1577 .await
1578 .expect("Failed to initialize journal");
1579
1580 {
1582 let stream = journal
1583 .replay(0, 0, NZUsize!(1024))
1584 .await
1585 .expect("unable to setup replay");
1586 pin_mut!(stream);
1587 let mut items = Vec::<(u64, u64)>::new();
1588 while let Some(result) = stream.next().await {
1589 match result {
1590 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1591 Err(err) => panic!("Failed to read item: {err}"),
1592 }
1593 }
1594 assert!(items.is_empty());
1595 }
1596 drop(journal);
1597
1598 let (_, blob_size) = context
1600 .open(&cfg.partition, §ion.to_be_bytes())
1601 .await
1602 .expect("Failed to open blob");
1603 assert_eq!(blob_size, 0);
1604 });
1605 }
1606
1607 #[test_traced]
1608 fn test_journal_truncation_recovery() {
1609 let executor = deterministic::Runner::default();
1611
1612 executor.start(|context| async move {
1614 let cfg = Config {
1616 partition: "test-partition".into(),
1617 compression: None,
1618 codec_config: (),
1619 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1620 write_buffer: NZUsize!(1024),
1621 };
1622
1623 let mut journal = Journal::init(context.child("first"), cfg.clone())
1625 .await
1626 .expect("Failed to initialize journal");
1627
1628 journal.append(1, &1).await.expect("Failed to append data");
1630
1631 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1633 for (index, data) in &data_items {
1634 journal
1635 .append(*index, data)
1636 .await
1637 .expect("Failed to append data");
1638 journal.sync(*index).await.expect("Failed to sync blob");
1639 }
1640
1641 journal.sync_all().await.expect("Failed to sync");
1643 drop(journal);
1644
1645 let (blob, blob_size) = context
1647 .open(&cfg.partition, &2u64.to_be_bytes())
1648 .await
1649 .expect("Failed to open blob");
1650 blob.resize(blob_size - 4)
1651 .await
1652 .expect("Failed to corrupt blob");
1653 blob.sync().await.expect("Failed to sync blob");
1654
1655 let journal = Journal::init(context.child("second"), cfg.clone())
1657 .await
1658 .expect("Failed to re-initialize journal");
1659
1660 let mut items = Vec::<(u64, u32)>::new();
1662 {
1663 let stream = journal
1664 .replay(0, 0, NZUsize!(1024))
1665 .await
1666 .expect("unable to setup replay");
1667 pin_mut!(stream);
1668 while let Some(result) = stream.next().await {
1669 match result {
1670 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1671 Err(err) => panic!("Failed to read item: {err}"),
1672 }
1673 }
1674 }
1675 drop(journal);
1676
1677 assert_eq!(items.len(), 1);
1679 assert_eq!(items[0].0, 1);
1680 assert_eq!(items[0].1, 1);
1681
1682 let (_, blob_size) = context
1684 .open(&cfg.partition, &2u64.to_be_bytes())
1685 .await
1686 .expect("Failed to open blob");
1687 assert_eq!(blob_size, 0);
1688
1689 let mut journal = Journal::init(context.child("third"), cfg.clone())
1691 .await
1692 .expect("Failed to re-initialize journal");
1693
1694 let mut items = Vec::<(u64, u32)>::new();
1696 {
1697 let stream = journal
1698 .replay(0, 0, NZUsize!(1024))
1699 .await
1700 .expect("unable to setup replay");
1701 pin_mut!(stream);
1702 while let Some(result) = stream.next().await {
1703 match result {
1704 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1705 Err(err) => panic!("Failed to read item: {err}"),
1706 }
1707 }
1708 }
1709
1710 assert_eq!(items.len(), 1);
1712 assert_eq!(items[0].0, 1);
1713 assert_eq!(items[0].1, 1);
1714
1715 let (_offset, _) = journal.append(2, &5).await.expect("Failed to append data");
1717 journal.sync(2).await.expect("Failed to sync blob");
1718
1719 let item = journal.get(2, 0).await.expect("Failed to get item");
1721 assert_eq!(item, 5);
1722
1723 drop(journal);
1725
1726 let journal = Journal::init(context.child("storage"), cfg.clone())
1728 .await
1729 .expect("Failed to re-initialize journal");
1730
1731 let mut items = Vec::<(u64, u32)>::new();
1733 {
1734 let stream = journal
1735 .replay(0, 0, NZUsize!(1024))
1736 .await
1737 .expect("unable to setup replay");
1738 pin_mut!(stream);
1739 while let Some(result) = stream.next().await {
1740 match result {
1741 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1742 Err(err) => panic!("Failed to read item: {err}"),
1743 }
1744 }
1745 }
1746
1747 assert_eq!(items.len(), 2);
1749 assert_eq!(items[0].0, 1);
1750 assert_eq!(items[0].1, 1);
1751 assert_eq!(items[1].0, 2);
1752 assert_eq!(items[1].1, 5);
1753 });
1754 }
1755
1756 #[test_traced]
1757 fn test_journal_handling_extra_data() {
1758 let executor = deterministic::Runner::default();
1760
1761 executor.start(|context| async move {
1763 let cfg = Config {
1765 partition: "test-partition".into(),
1766 compression: None,
1767 codec_config: (),
1768 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1769 write_buffer: NZUsize!(1024),
1770 };
1771
1772 let mut journal = Journal::init(context.child("first"), cfg.clone())
1774 .await
1775 .expect("Failed to initialize journal");
1776
1777 journal.append(1, &1).await.expect("Failed to append data");
1779
1780 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1782 for (index, data) in &data_items {
1783 journal
1784 .append(*index, data)
1785 .await
1786 .expect("Failed to append data");
1787 journal.sync(*index).await.expect("Failed to sync blob");
1788 }
1789
1790 journal.sync_all().await.expect("Failed to sync");
1792 drop(journal);
1793
1794 let (blob, blob_size) = context
1796 .open(&cfg.partition, &2u64.to_be_bytes())
1797 .await
1798 .expect("Failed to open blob");
1799 blob.write_at_sync(blob_size, vec![0u8; 16])
1800 .await
1801 .expect("Failed to add extra data");
1802
1803 let journal = Journal::init(context.child("second"), cfg)
1805 .await
1806 .expect("Failed to re-initialize journal");
1807
1808 let mut items = Vec::<(u64, i32)>::new();
1810 let stream = journal
1811 .replay(0, 0, NZUsize!(1024))
1812 .await
1813 .expect("unable to setup replay");
1814 pin_mut!(stream);
1815 while let Some(result) = stream.next().await {
1816 match result {
1817 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1818 Err(err) => panic!("Failed to read item: {err}"),
1819 }
1820 }
1821 });
1822 }
1823
1824 #[test_traced]
1825 fn test_journal_rewind() {
1826 let executor = deterministic::Runner::default();
1828 executor.start(|context| async move {
1829 let cfg = Config {
1831 partition: "test-partition".into(),
1832 compression: None,
1833 codec_config: (),
1834 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1835 write_buffer: NZUsize!(1024),
1836 };
1837 let mut journal = Journal::init(context, cfg).await.unwrap();
1838
1839 let size = journal.size(1).await.unwrap();
1841 assert_eq!(size, 0);
1842
1843 journal.append(1, &42i32).await.unwrap();
1845
1846 let size = journal.size(1).await.unwrap();
1848 assert!(size > 0);
1849
1850 journal.append(1, &43i32).await.unwrap();
1852 let new_size = journal.size(1).await.unwrap();
1853 assert!(new_size > size);
1854
1855 let size = journal.size(2).await.unwrap();
1857 assert_eq!(size, 0);
1858
1859 journal.append(2, &44i32).await.unwrap();
1861
1862 let size = journal.size(2).await.unwrap();
1864 assert!(size > 0);
1865
1866 journal.rewind(1, 0).await.unwrap();
1868
1869 let size = journal.size(1).await.unwrap();
1871 assert_eq!(size, 0);
1872
1873 let size = journal.size(2).await.unwrap();
1875 assert_eq!(size, 0);
1876 });
1877 }
1878
1879 #[test_traced]
1880 fn test_journal_rewind_section() {
1881 let executor = deterministic::Runner::default();
1883 executor.start(|context| async move {
1884 let cfg = Config {
1886 partition: "test-partition".into(),
1887 compression: None,
1888 codec_config: (),
1889 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1890 write_buffer: NZUsize!(1024),
1891 };
1892 let mut journal = Journal::init(context, cfg).await.unwrap();
1893
1894 let size = journal.size(1).await.unwrap();
1896 assert_eq!(size, 0);
1897
1898 journal.append(1, &42i32).await.unwrap();
1900
1901 let size = journal.size(1).await.unwrap();
1903 assert!(size > 0);
1904
1905 journal.append(1, &43i32).await.unwrap();
1907 let new_size = journal.size(1).await.unwrap();
1908 assert!(new_size > size);
1909
1910 let size = journal.size(2).await.unwrap();
1912 assert_eq!(size, 0);
1913
1914 journal.append(2, &44i32).await.unwrap();
1916
1917 let size = journal.size(2).await.unwrap();
1919 assert!(size > 0);
1920
1921 journal.rewind_section(1, 0).await.unwrap();
1923
1924 let size = journal.size(1).await.unwrap();
1926 assert_eq!(size, 0);
1927
1928 let size = journal.size(2).await.unwrap();
1930 assert!(size > 0);
1931 });
1932 }
1933
1934 #[test_traced]
1935 fn test_journal_small_items() {
1936 let executor = deterministic::Runner::default();
1937 executor.start(|context| async move {
1938 let cfg = Config {
1939 partition: "test-partition".into(),
1940 compression: None,
1941 codec_config: (),
1942 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1943 write_buffer: NZUsize!(1024),
1944 };
1945
1946 let mut journal = Journal::init(context.child("first"), cfg.clone())
1947 .await
1948 .expect("Failed to initialize journal");
1949
1950 let num_items = 100;
1952 let mut offsets = Vec::new();
1953 for i in 0..num_items {
1954 let (offset, size) = journal
1955 .append(1, &(i as u8))
1956 .await
1957 .expect("Failed to append data");
1958 assert_eq!(size, 1, "u8 should encode to 1 byte");
1959 offsets.push(offset);
1960 }
1961 journal.sync(1).await.expect("Failed to sync");
1962
1963 for (i, &offset) in offsets.iter().enumerate() {
1965 let item: u8 = journal.get(1, offset).await.expect("Failed to get item");
1966 assert_eq!(item, i as u8, "Item mismatch at offset {offset}");
1967 }
1968
1969 drop(journal);
1971 let journal = Journal::<_, u8>::init(context.child("second"), cfg)
1972 .await
1973 .expect("Failed to re-initialize journal");
1974
1975 let stream = journal
1977 .replay(0, 0, NZUsize!(1024))
1978 .await
1979 .expect("Failed to setup replay");
1980 pin_mut!(stream);
1981
1982 let mut count = 0;
1983 while let Some(result) = stream.next().await {
1984 let (section, offset, size, item) = result.expect("Failed to replay item");
1985 assert_eq!(section, 1);
1986 assert_eq!(offset, offsets[count]);
1987 assert_eq!(size, 1);
1988 assert_eq!(item, count as u8);
1989 count += 1;
1990 }
1991 assert_eq!(count, num_items, "Should replay all items");
1992 });
1993 }
1994
1995 #[test_traced]
1996 fn test_journal_rewind_many_sections() {
1997 let executor = deterministic::Runner::default();
1998 executor.start(|context| async move {
1999 let cfg = Config {
2000 partition: "test-partition".into(),
2001 compression: None,
2002 codec_config: (),
2003 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2004 write_buffer: NZUsize!(1024),
2005 };
2006 let mut journal = Journal::init(context.child("storage"), cfg.clone())
2007 .await
2008 .unwrap();
2009
2010 for section in 1u64..=10 {
2012 journal.append(section, &(section as i32)).await.unwrap();
2013 }
2014 journal.sync_all().await.unwrap();
2015
2016 for section in 1u64..=10 {
2018 let size = journal.size(section).await.unwrap();
2019 assert!(size > 0, "section {section} should have data");
2020 }
2021
2022 journal
2024 .rewind(5, journal.size(5).await.unwrap())
2025 .await
2026 .unwrap();
2027
2028 for section in 1u64..=5 {
2030 let size = journal.size(section).await.unwrap();
2031 assert!(size > 0, "section {section} should still have data");
2032 }
2033
2034 for section in 6u64..=10 {
2036 let size = journal.size(section).await.unwrap();
2037 assert_eq!(size, 0, "section {section} should be removed");
2038 }
2039
2040 {
2042 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
2043 pin_mut!(stream);
2044 let mut items = Vec::new();
2045 while let Some(result) = stream.next().await {
2046 let (section, _, _, item) = result.unwrap();
2047 items.push((section, item));
2048 }
2049 assert_eq!(items.len(), 5);
2050 for (i, (section, item)) in items.iter().enumerate() {
2051 assert_eq!(*section, (i + 1) as u64);
2052 assert_eq!(*item, (i + 1) as i32);
2053 }
2054 }
2055
2056 journal.destroy().await.unwrap();
2057 });
2058 }
2059
2060 #[test_traced]
2061 fn test_journal_rewind_partial_truncation() {
2062 let executor = deterministic::Runner::default();
2063 executor.start(|context| async move {
2064 let cfg = Config {
2065 partition: "test-partition".into(),
2066 compression: None,
2067 codec_config: (),
2068 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2069 write_buffer: NZUsize!(1024),
2070 };
2071 let mut journal = Journal::init(context.child("storage"), cfg.clone())
2072 .await
2073 .unwrap();
2074
2075 let mut sizes = Vec::new();
2077 for i in 0..5 {
2078 journal.append(1, &i).await.unwrap();
2079 journal.sync(1).await.unwrap();
2080 sizes.push(journal.size(1).await.unwrap());
2081 }
2082
2083 let target_size = sizes[2];
2085 journal.rewind(1, target_size).await.unwrap();
2086
2087 let new_size = journal.size(1).await.unwrap();
2089 assert_eq!(new_size, target_size);
2090
2091 {
2093 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
2094 pin_mut!(stream);
2095 let mut items = Vec::new();
2096 while let Some(result) = stream.next().await {
2097 let (_, _, _, item) = result.unwrap();
2098 items.push(item);
2099 }
2100 assert_eq!(items.len(), 3);
2101 for (i, item) in items.iter().enumerate() {
2102 assert_eq!(*item, i as i32);
2103 }
2104 }
2105
2106 journal.destroy().await.unwrap();
2107 });
2108 }
2109
2110 #[test_traced]
2111 fn test_journal_rewind_nonexistent_target() {
2112 let executor = deterministic::Runner::default();
2113 executor.start(|context| async move {
2114 let cfg = Config {
2115 partition: "test-partition".into(),
2116 compression: None,
2117 codec_config: (),
2118 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2119 write_buffer: NZUsize!(1024),
2120 };
2121 let mut journal = Journal::init(context.child("storage"), cfg.clone())
2122 .await
2123 .unwrap();
2124
2125 for section in 5u64..=7 {
2127 journal.append(section, &(section as i32)).await.unwrap();
2128 }
2129 journal.sync_all().await.unwrap();
2130
2131 journal.rewind(3, 0).await.unwrap();
2133
2134 for section in 5u64..=7 {
2136 let size = journal.size(section).await.unwrap();
2137 assert_eq!(size, 0, "section {section} should be removed");
2138 }
2139
2140 {
2142 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
2143 pin_mut!(stream);
2144 let items: Vec<_> = stream.collect().await;
2145 assert!(items.is_empty());
2146 }
2147
2148 journal.destroy().await.unwrap();
2149 });
2150 }
2151
2152 #[test_traced]
2153 fn test_journal_rewind_persistence() {
2154 let executor = deterministic::Runner::default();
2155 executor.start(|context| async move {
2156 let cfg = Config {
2157 partition: "test-partition".into(),
2158 compression: None,
2159 codec_config: (),
2160 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2161 write_buffer: NZUsize!(1024),
2162 };
2163
2164 let mut journal = Journal::init(context.child("first"), cfg.clone())
2166 .await
2167 .unwrap();
2168 for section in 1u64..=5 {
2169 journal.append(section, &(section as i32)).await.unwrap();
2170 }
2171 journal.sync_all().await.unwrap();
2172
2173 let size = journal.size(2).await.unwrap();
2175 journal.rewind(2, size).await.unwrap();
2176 journal.sync_all().await.unwrap();
2177 drop(journal);
2178
2179 let journal = Journal::<_, i32>::init(context.child("second"), cfg.clone())
2181 .await
2182 .unwrap();
2183
2184 for section in 1u64..=2 {
2186 let size = journal.size(section).await.unwrap();
2187 assert!(size > 0, "section {section} should have data after restart");
2188 }
2189
2190 for section in 3u64..=5 {
2192 let size = journal.size(section).await.unwrap();
2193 assert_eq!(size, 0, "section {section} should be gone after restart");
2194 }
2195
2196 {
2198 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
2199 pin_mut!(stream);
2200 let mut items = Vec::new();
2201 while let Some(result) = stream.next().await {
2202 let (section, _, _, item) = result.unwrap();
2203 items.push((section, item));
2204 }
2205 assert_eq!(items.len(), 2);
2206 assert_eq!(items[0], (1, 1));
2207 assert_eq!(items[1], (2, 2));
2208 }
2209
2210 journal.destroy().await.unwrap();
2211 });
2212 }
2213
2214 #[test_traced]
2215 fn test_journal_rewind_to_zero_removes_all_newer() {
2216 let executor = deterministic::Runner::default();
2217 executor.start(|context| async move {
2218 let cfg = Config {
2219 partition: "test-partition".into(),
2220 compression: None,
2221 codec_config: (),
2222 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2223 write_buffer: NZUsize!(1024),
2224 };
2225 let mut journal = Journal::init(context.child("storage"), cfg.clone())
2226 .await
2227 .unwrap();
2228
2229 for section in 1u64..=3 {
2231 journal.append(section, &(section as i32)).await.unwrap();
2232 }
2233 journal.sync_all().await.unwrap();
2234
2235 journal.rewind(1, 0).await.unwrap();
2237
2238 let size = journal.size(1).await.unwrap();
2240 assert_eq!(size, 0, "section 1 should be empty");
2241
2242 for section in 2u64..=3 {
2244 let size = journal.size(section).await.unwrap();
2245 assert_eq!(size, 0, "section {section} should be removed");
2246 }
2247
2248 {
2250 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
2251 pin_mut!(stream);
2252 let items: Vec<_> = stream.collect().await;
2253 assert!(items.is_empty());
2254 }
2255
2256 journal.destroy().await.unwrap();
2257 });
2258 }
2259
2260 #[test_traced]
2261 fn test_journal_replay_start_offset_with_trailing_bytes() {
2262 let executor = deterministic::Runner::default();
2264 executor.start(|context| async move {
2265 let cfg = Config {
2266 partition: "test-partition".into(),
2267 compression: None,
2268 codec_config: (),
2269 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2270 write_buffer: NZUsize!(1024),
2271 };
2272 let mut journal = Journal::init(context.child("first"), cfg.clone())
2273 .await
2274 .expect("Failed to initialize journal");
2275
2276 for i in 0..5i32 {
2278 journal.append(1, &i).await.unwrap();
2279 }
2280 journal.sync(1).await.unwrap();
2281 let valid_logical_size = journal.size(1).await.unwrap();
2282 drop(journal);
2283
2284 let (blob, physical_size_before) = context
2286 .open(&cfg.partition, &1u64.to_be_bytes())
2287 .await
2288 .unwrap();
2289
2290 blob.write_at_sync(physical_size_before, vec![0xFF, 0xFF])
2293 .await
2294 .unwrap();
2295
2296 let start_offset = valid_logical_size;
2300 {
2301 let journal = Journal::<_, i32>::init(context.child("second"), cfg.clone())
2302 .await
2303 .unwrap();
2304
2305 let stream = journal
2306 .replay(1, start_offset, NZUsize!(1024))
2307 .await
2308 .unwrap();
2309 pin_mut!(stream);
2310
2311 while let Some(_result) = stream.next().await {}
2313 }
2314
2315 let (_, physical_size_after) = context
2317 .open(&cfg.partition, &1u64.to_be_bytes())
2318 .await
2319 .unwrap();
2320
2321 assert!(
2324 physical_size_after >= physical_size_before,
2325 "Valid data was lost! Physical blob truncated from {physical_size_before} to \
2326 {physical_size_after}. Logical valid size was {valid_logical_size}. \
2327 This indicates valid_offset was incorrectly initialized to 0 instead of start_offset."
2328 );
2329 });
2330 }
2331
2332 #[test_traced]
2333 fn test_journal_large_item_spanning_pages() {
2334 const LARGE_SIZE: usize = 2048;
2336 type LargeItem = [u8; LARGE_SIZE];
2337
2338 let executor = deterministic::Runner::default();
2339 executor.start(|context| async move {
2340 let cfg = Config {
2341 partition: "test-partition".into(),
2342 compression: None,
2343 codec_config: (),
2344 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2345 write_buffer: NZUsize!(4096),
2346 };
2347 let mut journal = Journal::init(context.child("first"), cfg.clone())
2348 .await
2349 .expect("Failed to initialize journal");
2350
2351 let mut large_data: LargeItem = [0u8; LARGE_SIZE];
2353 for (i, byte) in large_data.iter_mut().enumerate() {
2354 *byte = (i % 256) as u8;
2355 }
2356 assert!(
2357 LARGE_SIZE > PAGE_SIZE.get() as usize,
2358 "Item must be larger than page size"
2359 );
2360
2361 let (offset, size) = journal
2363 .append(1, &large_data)
2364 .await
2365 .expect("Failed to append large item");
2366 assert_eq!(size as usize, LARGE_SIZE);
2367 journal.sync(1).await.expect("Failed to sync");
2368
2369 let retrieved: LargeItem = journal
2371 .get(1, offset)
2372 .await
2373 .expect("Failed to get large item");
2374 assert_eq!(retrieved, large_data, "Random access read mismatch");
2375
2376 drop(journal);
2378 let journal = Journal::<_, LargeItem>::init(context.child("second"), cfg.clone())
2379 .await
2380 .expect("Failed to re-initialize journal");
2381
2382 {
2384 let stream = journal
2385 .replay(0, 0, NZUsize!(1024))
2386 .await
2387 .expect("Failed to setup replay");
2388 pin_mut!(stream);
2389
2390 let mut items = Vec::new();
2391 while let Some(result) = stream.next().await {
2392 let (section, off, sz, item) = result.expect("Failed to replay item");
2393 items.push((section, off, sz, item));
2394 }
2395
2396 assert_eq!(items.len(), 1, "Should have exactly one item");
2397 let (section, off, sz, item) = &items[0];
2398 assert_eq!(*section, 1);
2399 assert_eq!(*off, offset);
2400 assert_eq!(*sz as usize, LARGE_SIZE);
2401 assert_eq!(*item, large_data, "Replay read mismatch");
2402 }
2403
2404 journal.destroy().await.unwrap();
2405 });
2406 }
2407
2408 #[test_traced]
2409 fn test_journal_non_contiguous_sections() {
2410 let executor = deterministic::Runner::default();
2413 executor.start(|context| async move {
2414 let cfg = Config {
2415 partition: "test-partition".into(),
2416 compression: None,
2417 codec_config: (),
2418 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2419 write_buffer: NZUsize!(1024),
2420 };
2421 let mut journal = Journal::init(context.child("first"), cfg.clone())
2422 .await
2423 .expect("Failed to initialize journal");
2424
2425 let sections_and_data = [(1u64, 100i32), (5u64, 500i32), (10u64, 1000i32)];
2427 let mut offsets = Vec::new();
2428
2429 for (section, data) in §ions_and_data {
2430 let (offset, _) = journal
2431 .append(*section, data)
2432 .await
2433 .expect("Failed to append");
2434 offsets.push(offset);
2435 }
2436 journal.sync_all().await.expect("Failed to sync");
2437
2438 for (i, (section, expected_data)) in sections_and_data.iter().enumerate() {
2440 let retrieved: i32 = journal
2441 .get(*section, offsets[i])
2442 .await
2443 .expect("Failed to get item");
2444 assert_eq!(retrieved, *expected_data);
2445 }
2446
2447 for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
2449 let result = journal.get(missing_section, 0).await;
2450 assert!(
2451 matches!(result, Err(Error::SectionOutOfRange(_))),
2452 "Expected SectionOutOfRange for section {}, got {:?}",
2453 missing_section,
2454 result
2455 );
2456 }
2457
2458 drop(journal);
2460 let journal = Journal::<_, i32>::init(context.child("second"), cfg.clone())
2461 .await
2462 .expect("Failed to re-initialize journal");
2463
2464 {
2466 let stream = journal
2467 .replay(0, 0, NZUsize!(1024))
2468 .await
2469 .expect("Failed to setup replay");
2470 pin_mut!(stream);
2471
2472 let mut items = Vec::new();
2473 while let Some(result) = stream.next().await {
2474 let (section, _, _, item) = result.expect("Failed to replay item");
2475 items.push((section, item));
2476 }
2477
2478 assert_eq!(items.len(), 3, "Should have 3 items");
2479 assert_eq!(items[0], (1, 100));
2480 assert_eq!(items[1], (5, 500));
2481 assert_eq!(items[2], (10, 1000));
2482 }
2483
2484 {
2486 let stream = journal
2487 .replay(5, 0, NZUsize!(1024))
2488 .await
2489 .expect("Failed to setup replay from section 5");
2490 pin_mut!(stream);
2491
2492 let mut items = Vec::new();
2493 while let Some(result) = stream.next().await {
2494 let (section, _, _, item) = result.expect("Failed to replay item");
2495 items.push((section, item));
2496 }
2497
2498 assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
2499 assert_eq!(items[0], (5, 500));
2500 assert_eq!(items[1], (10, 1000));
2501 }
2502
2503 {
2505 let stream = journal
2506 .replay(3, 0, NZUsize!(1024))
2507 .await
2508 .expect("Failed to setup replay from section 3");
2509 pin_mut!(stream);
2510
2511 let mut items = Vec::new();
2512 while let Some(result) = stream.next().await {
2513 let (section, _, _, item) = result.expect("Failed to replay item");
2514 items.push((section, item));
2515 }
2516
2517 assert_eq!(items.len(), 2);
2519 assert_eq!(items[0], (5, 500));
2520 assert_eq!(items[1], (10, 1000));
2521 }
2522
2523 journal.destroy().await.unwrap();
2524 });
2525 }
2526
2527 #[test_traced]
2528 fn test_journal_empty_section_in_middle() {
2529 let executor = deterministic::Runner::default();
2532 executor.start(|context| async move {
2533 let cfg = Config {
2534 partition: "test-partition".into(),
2535 compression: None,
2536 codec_config: (),
2537 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2538 write_buffer: NZUsize!(1024),
2539 };
2540 let mut journal = Journal::init(context.child("first"), cfg.clone())
2541 .await
2542 .expect("Failed to initialize journal");
2543
2544 journal.append(1, &100i32).await.expect("Failed to append");
2546
2547 journal.append(2, &200i32).await.expect("Failed to append");
2550 journal.sync(2).await.expect("Failed to sync");
2551 journal
2552 .rewind_section(2, 0)
2553 .await
2554 .expect("Failed to rewind");
2555
2556 journal.append(3, &300i32).await.expect("Failed to append");
2558
2559 journal.sync_all().await.expect("Failed to sync");
2560
2561 assert!(journal.size(1).await.unwrap() > 0);
2563 assert_eq!(journal.size(2).await.unwrap(), 0);
2564 assert!(journal.size(3).await.unwrap() > 0);
2565
2566 drop(journal);
2568 let journal = Journal::<_, i32>::init(context.child("second"), cfg.clone())
2569 .await
2570 .expect("Failed to re-initialize journal");
2571
2572 {
2574 let stream = journal
2575 .replay(0, 0, NZUsize!(1024))
2576 .await
2577 .expect("Failed to setup replay");
2578 pin_mut!(stream);
2579
2580 let mut items = Vec::new();
2581 while let Some(result) = stream.next().await {
2582 let (section, _, _, item) = result.expect("Failed to replay item");
2583 items.push((section, item));
2584 }
2585
2586 assert_eq!(
2587 items.len(),
2588 2,
2589 "Should have 2 items (skipping empty section)"
2590 );
2591 assert_eq!(items[0], (1, 100));
2592 assert_eq!(items[1], (3, 300));
2593 }
2594
2595 {
2597 let stream = journal
2598 .replay(2, 0, NZUsize!(1024))
2599 .await
2600 .expect("Failed to setup replay from section 2");
2601 pin_mut!(stream);
2602
2603 let mut items = Vec::new();
2604 while let Some(result) = stream.next().await {
2605 let (section, _, _, item) = result.expect("Failed to replay item");
2606 items.push((section, item));
2607 }
2608
2609 assert_eq!(items.len(), 1, "Should have 1 item from section 3");
2610 assert_eq!(items[0], (3, 300));
2611 }
2612
2613 journal.destroy().await.unwrap();
2614 });
2615 }
2616
2617 #[test_traced]
2618 fn test_journal_item_exactly_page_size() {
2619 const ITEM_SIZE: usize = PAGE_SIZE.get() as usize;
2622 type ExactItem = [u8; ITEM_SIZE];
2623
2624 let executor = deterministic::Runner::default();
2625 executor.start(|context| async move {
2626 let cfg = Config {
2627 partition: "test-partition".into(),
2628 compression: None,
2629 codec_config: (),
2630 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2631 write_buffer: NZUsize!(4096),
2632 };
2633 let mut journal = Journal::init(context.child("first"), cfg.clone())
2634 .await
2635 .expect("Failed to initialize journal");
2636
2637 let mut exact_data: ExactItem = [0u8; ITEM_SIZE];
2639 for (i, byte) in exact_data.iter_mut().enumerate() {
2640 *byte = (i % 256) as u8;
2641 }
2642
2643 let (offset, size) = journal
2645 .append(1, &exact_data)
2646 .await
2647 .expect("Failed to append exact item");
2648 assert_eq!(size as usize, ITEM_SIZE);
2649 journal.sync(1).await.expect("Failed to sync");
2650
2651 let retrieved: ExactItem = journal
2653 .get(1, offset)
2654 .await
2655 .expect("Failed to get exact item");
2656 assert_eq!(retrieved, exact_data, "Random access read mismatch");
2657
2658 drop(journal);
2660 let journal = Journal::<_, ExactItem>::init(context.child("second"), cfg.clone())
2661 .await
2662 .expect("Failed to re-initialize journal");
2663
2664 {
2666 let stream = journal
2667 .replay(0, 0, NZUsize!(1024))
2668 .await
2669 .expect("Failed to setup replay");
2670 pin_mut!(stream);
2671
2672 let mut items = Vec::new();
2673 while let Some(result) = stream.next().await {
2674 let (section, off, sz, item) = result.expect("Failed to replay item");
2675 items.push((section, off, sz, item));
2676 }
2677
2678 assert_eq!(items.len(), 1, "Should have exactly one item");
2679 let (section, off, sz, item) = &items[0];
2680 assert_eq!(*section, 1);
2681 assert_eq!(*off, offset);
2682 assert_eq!(*sz as usize, ITEM_SIZE);
2683 assert_eq!(*item, exact_data, "Replay read mismatch");
2684 }
2685
2686 journal.destroy().await.unwrap();
2687 });
2688 }
2689
2690 #[test_traced]
2691 fn test_journal_varint_spanning_page_boundary() {
2692 const SMALL_PAGE: NonZeroU16 = NZU16!(16);
2700
2701 let executor = deterministic::Runner::default();
2702 executor.start(|context| async move {
2703 let cfg = Config {
2704 partition: "test-partition".into(),
2705 compression: None,
2706 codec_config: (),
2707 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE, PAGE_CACHE_SIZE),
2708 write_buffer: NZUsize!(1024),
2709 };
2710 let mut journal: Journal<_, [u8; 128]> =
2711 Journal::init(context.child("first"), cfg.clone())
2712 .await
2713 .expect("Failed to initialize journal");
2714
2715 let item1: [u8; 128] = [1u8; 128];
2717 let item2: [u8; 128] = [2u8; 128];
2718 let item3: [u8; 128] = [3u8; 128];
2719
2720 let (offset1, _) = journal.append(1, &item1).await.expect("Failed to append");
2723 let (offset2, _) = journal.append(1, &item2).await.expect("Failed to append");
2724 let (offset3, _) = journal.append(1, &item3).await.expect("Failed to append");
2725
2726 journal.sync(1).await.expect("Failed to sync");
2727
2728 let retrieved1: [u8; 128] = journal.get(1, offset1).await.expect("Failed to get");
2730 let retrieved2: [u8; 128] = journal.get(1, offset2).await.expect("Failed to get");
2731 let retrieved3: [u8; 128] = journal.get(1, offset3).await.expect("Failed to get");
2732 assert_eq!(retrieved1, item1);
2733 assert_eq!(retrieved2, item2);
2734 assert_eq!(retrieved3, item3);
2735
2736 drop(journal);
2738 let journal: Journal<_, [u8; 128]> =
2739 Journal::init(context.child("second"), cfg.clone())
2740 .await
2741 .expect("Failed to re-initialize journal");
2742
2743 {
2745 let stream = journal
2746 .replay(0, 0, NZUsize!(64))
2747 .await
2748 .expect("Failed to setup replay");
2749 pin_mut!(stream);
2750
2751 let mut items = Vec::new();
2752 while let Some(result) = stream.next().await {
2753 let (section, off, _, item) = result.expect("Failed to replay item");
2754 items.push((section, off, item));
2755 }
2756
2757 assert_eq!(items.len(), 3, "Should have 3 items");
2758 assert_eq!(items[0], (1, offset1, item1));
2759 assert_eq!(items[1], (1, offset2, item2));
2760 assert_eq!(items[2], (1, offset3, item3));
2761 }
2762
2763 journal.destroy().await.unwrap();
2764 });
2765 }
2766
2767 #[test_traced]
2768 fn test_journal_clear() {
2769 let executor = deterministic::Runner::default();
2770 executor.start(|context| async move {
2771 let cfg = Config {
2772 partition: "clear-test".into(),
2773 compression: None,
2774 codec_config: (),
2775 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2776 write_buffer: NZUsize!(1024),
2777 };
2778
2779 let mut journal: Journal<_, u64> = Journal::init(context.child("journal"), cfg.clone())
2780 .await
2781 .expect("Failed to initialize journal");
2782
2783 for section in 0..5u64 {
2785 for i in 0..10u64 {
2786 journal
2787 .append(section, &(section * 1000 + i))
2788 .await
2789 .expect("Failed to append");
2790 }
2791 journal.sync(section).await.expect("Failed to sync");
2792 }
2793
2794 assert_eq!(journal.get(0, 0).await.unwrap(), 0);
2796 assert_eq!(journal.get(4, 0).await.unwrap(), 4000);
2797
2798 journal.clear().await.expect("Failed to clear");
2800
2801 for section in 0..5u64 {
2803 assert!(matches!(
2804 journal.get(section, 0).await,
2805 Err(Error::SectionOutOfRange(s)) if s == section
2806 ));
2807 }
2808
2809 for i in 0..5u64 {
2811 journal
2812 .append(10, &(i * 100))
2813 .await
2814 .expect("Failed to append after clear");
2815 }
2816 journal.sync(10).await.expect("Failed to sync after clear");
2817
2818 assert_eq!(journal.get(10, 0).await.unwrap(), 0);
2820
2821 assert!(matches!(
2823 journal.get(0, 0).await,
2824 Err(Error::SectionOutOfRange(0))
2825 ));
2826
2827 journal.destroy().await.unwrap();
2828 });
2829 }
2830}