1use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
84use crate::journal::Error;
85use commonware_codec::{
86 varint::{UInt, MAX_U32_VARINT_SIZE},
87 Codec, CodecShared, EncodeSize, ReadExt, Write as CodecWrite,
88};
89use commonware_runtime::{
90 buffer::paged::{Append, CacheRef, Replay},
91 Blob, Buf, BufMut, IoBuf, IoBufMut, Metrics, Storage,
92};
93use futures::stream::{self, Stream, StreamExt};
94use std::{io::Cursor, num::NonZeroUsize};
95use tracing::{trace, warn};
96use zstd::{bulk::compress, decode_all};
97
98#[derive(Clone)]
100pub struct Config<C> {
101 pub partition: String,
104
105 pub compression: Option<u8>,
107
108 pub codec_config: C,
110
111 pub page_cache: CacheRef,
113
114 pub write_buffer: NonZeroUsize,
116}
117
118#[inline]
121fn decode_length_prefix(buf: &mut impl Buf) -> Result<(usize, usize), Error> {
122 let initial = buf.remaining();
123 let size = UInt::<u32>::read(buf)?.0 as usize;
124 let varint_len = initial - buf.remaining();
125 Ok((size, varint_len))
126}
127
128enum ItemInfo {
130 Complete {
132 varint_len: usize,
134 data_len: usize,
136 },
137 Incomplete {
139 varint_len: usize,
141 prefix_len: usize,
143 total_len: usize,
145 },
146}
147
148fn find_item(buf: &mut impl Buf, offset: u64) -> Result<(u64, ItemInfo), Error> {
152 let available = buf.remaining();
153 let (size, varint_len) = decode_length_prefix(buf)?;
154 let next_offset = offset
155 .checked_add(varint_len as u64)
156 .ok_or(Error::OffsetOverflow)?
157 .checked_add(size as u64)
158 .ok_or(Error::OffsetOverflow)?;
159 let buffered = available.saturating_sub(varint_len);
160
161 let item = if buffered >= size {
162 ItemInfo::Complete {
163 varint_len,
164 data_len: size,
165 }
166 } else {
167 ItemInfo::Incomplete {
168 varint_len,
169 prefix_len: buffered,
170 total_len: size,
171 }
172 };
173
174 Ok((next_offset, item))
175}
176
177struct ReplayState<B: Blob, C> {
179 section: u64,
180 blob: Append<B>,
181 replay: Replay<B>,
182 skip_bytes: u64,
183 offset: u64,
184 valid_offset: u64,
185 codec_config: C,
186 compressed: bool,
187 done: bool,
188}
189
190fn decode_item<V: Codec>(item_data: impl Buf, cfg: &V::Cfg, compressed: bool) -> Result<V, Error> {
192 if compressed {
193 let decompressed =
194 decode_all(item_data.reader()).map_err(|_| Error::DecompressionFailed)?;
195 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)
196 } else {
197 V::decode_cfg(item_data, cfg).map_err(Error::Codec)
198 }
199}
200
201pub struct Journal<E: Storage + Metrics, V: Codec> {
215 manager: Manager<E, AppendFactory>,
216
217 compression: Option<u8>,
219
220 codec_config: V::Cfg,
222}
223
224impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
225 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
231 let manager_cfg = ManagerConfig {
232 partition: cfg.partition,
233 factory: AppendFactory {
234 write_buffer: cfg.write_buffer,
235 page_cache_ref: cfg.page_cache,
236 },
237 };
238 let manager = Manager::init(context, manager_cfg).await?;
239
240 Ok(Self {
241 manager,
242 compression: cfg.compression,
243 codec_config: cfg.codec_config,
244 })
245 }
246
247 async fn read(
249 compressed: bool,
250 cfg: &V::Cfg,
251 blob: &Append<E::Blob>,
252 offset: u64,
253 ) -> Result<(u64, u32, V), Error> {
254 let (buf, available) = blob
256 .read_up_to(
257 offset,
258 MAX_U32_VARINT_SIZE,
259 IoBufMut::with_capacity(MAX_U32_VARINT_SIZE),
260 )
261 .await?;
262 let buf = buf.freeze();
263 let mut cursor = Cursor::new(buf.slice(..available));
264 let (next_offset, item_info) = find_item(&mut cursor, offset)?;
265
266 let (item_size, decoded) = match item_info {
268 ItemInfo::Complete {
269 varint_len,
270 data_len,
271 } => {
272 let data = buf.slice(varint_len..varint_len + data_len);
274 let decoded = decode_item::<V>(data, cfg, compressed)?;
275 (data_len as u32, decoded)
276 }
277 ItemInfo::Incomplete {
278 varint_len,
279 prefix_len,
280 total_len,
281 } => {
282 let prefix = buf.slice(varint_len..varint_len + prefix_len);
284 let read_offset = offset + varint_len as u64 + prefix_len as u64;
285 let remainder_len = total_len - prefix_len;
286 let mut remainder = vec![0u8; remainder_len];
287 blob.read_into(&mut remainder, read_offset).await?;
288 let chained = prefix.chain(IoBuf::from(remainder));
289 let decoded = decode_item::<V>(chained, cfg, compressed)?;
290 (total_len as u32, decoded)
291 }
292 };
293
294 Ok((next_offset, item_size, decoded))
295 }
296
297 pub async fn replay(
301 &self,
302 start_section: u64,
303 mut start_offset: u64,
304 buffer: NonZeroUsize,
305 ) -> Result<impl Stream<Item = Result<(u64, u64, u32, V), Error>> + Send + '_, Error> {
306 let codec_config = self.codec_config.clone();
308 let compressed = self.compression.is_some();
309 let mut blobs = Vec::new();
310 for (§ion, blob) in self.manager.sections_from(start_section) {
311 blobs.push((
312 section,
313 blob.clone(),
314 blob.replay(buffer).await?,
315 codec_config.clone(),
316 compressed,
317 ));
318 }
319
320 Ok(stream::iter(blobs).flat_map(
322 move |(section, blob, replay, codec_config, compressed)| {
323 let skip_bytes = if section == start_section {
325 start_offset
326 } else {
327 start_offset = 0;
328 0
329 };
330
331 stream::unfold(
332 ReplayState {
333 section,
334 blob,
335 replay,
336 skip_bytes,
337 offset: 0,
338 valid_offset: skip_bytes,
339 codec_config,
340 compressed,
341 done: false,
342 },
343 move |mut state| async move {
344 if state.done {
345 return None;
346 }
347
348 let blob_size = state.replay.blob_size();
349 let mut batch: Vec<Result<(u64, u64, u32, V), Error>> = Vec::new();
350 loop {
351 match state.replay.ensure(MAX_U32_VARINT_SIZE).await {
355 Ok(true) => {}
356 Ok(false) => {
357 if state.replay.remaining() == 0 {
359 state.done = true;
360 return if batch.is_empty() {
361 None
362 } else {
363 Some((batch, state))
364 };
365 }
366 }
368 Err(err) => {
369 batch.push(Err(err.into()));
370 state.done = true;
371 return Some((batch, state));
372 }
373 }
374
375 if state.skip_bytes > 0 {
377 let to_skip =
378 state.skip_bytes.min(state.replay.remaining() as u64) as usize;
379 state.replay.advance(to_skip);
380 state.skip_bytes -= to_skip as u64;
381 state.offset += to_skip as u64;
382 continue;
383 }
384
385 let before_remaining = state.replay.remaining();
387 let (item_size, varint_len) =
388 match decode_length_prefix(&mut state.replay) {
389 Ok(result) => result,
390 Err(err) => {
391 if state.replay.is_exhausted()
393 || before_remaining < MAX_U32_VARINT_SIZE
394 {
395 if state.valid_offset < blob_size
397 && state.offset < blob_size
398 {
399 warn!(
400 blob = state.section,
401 bad_offset = state.offset,
402 new_size = state.valid_offset,
403 "trailing bytes detected: truncating"
404 );
405 state.blob.resize(state.valid_offset).await.ok()?;
406 }
407 state.done = true;
408 return if batch.is_empty() {
409 None
410 } else {
411 Some((batch, state))
412 };
413 }
414 batch.push(Err(err));
415 state.done = true;
416 return Some((batch, state));
417 }
418 };
419
420 match state.replay.ensure(item_size).await {
422 Ok(true) => {}
423 Ok(false) => {
424 warn!(
426 blob = state.section,
427 bad_offset = state.offset,
428 new_size = state.valid_offset,
429 "incomplete item at end: truncating"
430 );
431 state.blob.resize(state.valid_offset).await.ok()?;
432 state.done = true;
433 return if batch.is_empty() {
434 None
435 } else {
436 Some((batch, state))
437 };
438 }
439 Err(err) => {
440 batch.push(Err(err.into()));
441 state.done = true;
442 return Some((batch, state));
443 }
444 }
445
446 let item_offset = state.offset;
448 let next_offset = match state
449 .offset
450 .checked_add(varint_len as u64)
451 .and_then(|o| o.checked_add(item_size as u64))
452 {
453 Some(o) => o,
454 None => {
455 batch.push(Err(Error::OffsetOverflow));
456 state.done = true;
457 return Some((batch, state));
458 }
459 };
460 match decode_item::<V>(
461 (&mut state.replay).take(item_size),
462 &state.codec_config,
463 state.compressed,
464 ) {
465 Ok(decoded) => {
466 batch.push(Ok((
467 state.section,
468 item_offset,
469 item_size as u32,
470 decoded,
471 )));
472 state.valid_offset = next_offset;
473 state.offset = next_offset;
474 }
475 Err(err) => {
476 batch.push(Err(err));
477 state.done = true;
478 return Some((batch, state));
479 }
480 }
481
482 if !batch.is_empty() && state.replay.remaining() < MAX_U32_VARINT_SIZE {
484 return Some((batch, state));
485 }
486 }
487 },
488 )
489 .flat_map(stream::iter)
490 },
491 ))
492 }
493
494 pub(crate) fn encode_item(compression: Option<u8>, item: &V) -> Result<(Vec<u8>, u32), Error> {
499 if let Some(compression) = compression {
500 let encoded = item.encode();
502 let compressed =
503 compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?;
504 let item_len = compressed.len();
505 let item_len_u32: u32 = match item_len.try_into() {
506 Ok(len) => len,
507 Err(_) => return Err(Error::ItemTooLarge(item_len)),
508 };
509 let size_len = UInt(item_len_u32).encode_size();
510 let entry_len = size_len
511 .checked_add(item_len)
512 .ok_or(Error::OffsetOverflow)?;
513
514 let mut buf = Vec::with_capacity(entry_len);
515 UInt(item_len_u32).write(&mut buf);
516 buf.put_slice(&compressed);
517
518 Ok((buf, item_len_u32))
519 } else {
520 let item_len = item.encode_size();
522 let item_len_u32: u32 = match item_len.try_into() {
523 Ok(len) => len,
524 Err(_) => return Err(Error::ItemTooLarge(item_len)),
525 };
526 let size_len = UInt(item_len_u32).encode_size();
527 let entry_len = size_len
528 .checked_add(item_len)
529 .ok_or(Error::OffsetOverflow)?;
530
531 let mut buf = Vec::with_capacity(entry_len);
532 UInt(item_len_u32).write(&mut buf);
533 item.write(&mut buf);
534
535 Ok((buf, item_len_u32))
536 }
537 }
538
539 pub async fn append(&mut self, section: u64, item: &V) -> Result<(u64, u32), Error> {
543 let (buf, item_len) = Self::encode_item(self.compression, item)?;
544 self.append_raw(section, &buf)
545 .await
546 .map(|offset| (offset, item_len))
547 }
548
549 pub(crate) async fn append_raw(&mut self, section: u64, buf: &[u8]) -> Result<u64, Error> {
554 let blob = self.manager.get_or_create(section).await?;
555 let offset = blob.size().await;
556 blob.append(buf).await?;
557 trace!(blob = section, offset, "appended item");
558 Ok(offset)
559 }
560
561 pub async fn get(&self, section: u64, offset: u64) -> Result<V, Error> {
572 let blob = self
573 .manager
574 .get(section)?
575 .ok_or(Error::SectionOutOfRange(section))?;
576
577 let (_, _, item) =
579 Self::read(self.compression.is_some(), &self.codec_config, blob, offset).await?;
580 Ok(item)
581 }
582
583 pub fn try_get_sync(&self, section: u64, offset: u64) -> Option<V> {
585 let blob = self.manager.get(section).ok()??;
586 let remaining = blob.try_size()?.checked_sub(offset)?;
587 let header_len = usize::try_from(remaining.min(MAX_U32_VARINT_SIZE as u64)).ok()?;
588 if header_len == 0 {
589 return None;
590 }
591
592 let mut header = [0u8; MAX_U32_VARINT_SIZE];
594 if !blob.try_read_sync(offset, &mut header[..header_len]) {
595 return None;
596 }
597 let mut cursor = Cursor::new(&header[..header_len]);
598 let (_, item_info) = find_item(&mut cursor, offset).ok()?;
599
600 let (varint_len, data_len) = match item_info {
601 ItemInfo::Complete {
602 varint_len,
603 data_len,
604 } => (varint_len, data_len),
605 ItemInfo::Incomplete {
606 varint_len,
607 total_len,
608 ..
609 } => (varint_len, total_len),
610 };
611 let item_len = varint_len.checked_add(data_len)?;
612 if item_len > usize::try_from(remaining).ok()? {
613 return None;
614 }
615
616 if item_len <= header_len {
618 return decode_item::<V>(
619 &header[varint_len..varint_len + data_len],
620 &self.codec_config,
621 self.compression.is_some(),
622 )
623 .ok();
624 }
625
626 let mut full = vec![0u8; item_len];
628 if !blob.try_read_sync(offset, &mut full) {
629 return None;
630 }
631 decode_item::<V>(
632 &full[varint_len..varint_len + data_len],
633 &self.codec_config,
634 self.compression.is_some(),
635 )
636 .ok()
637 }
638
639 pub async fn size(&self, section: u64) -> Result<u64, Error> {
643 self.manager.size(section).await
644 }
645
646 pub async fn rewind_to_offset(&mut self, section: u64, offset: u64) -> Result<(), Error> {
654 self.manager.rewind(section, offset).await
655 }
656
657 pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
667 self.manager.rewind(section, size).await
668 }
669
670 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
678 self.manager.rewind_section(section, size).await
679 }
680
681 pub async fn sync(&self, section: u64) -> Result<(), Error> {
685 self.manager.sync(section).await
686 }
687
688 pub async fn sync_all(&self) -> Result<(), Error> {
690 self.manager.sync_all().await
691 }
692
693 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
695 self.manager.prune(min).await
696 }
697
698 pub fn oldest_section(&self) -> Option<u64> {
700 self.manager.oldest_section()
701 }
702
703 pub fn newest_section(&self) -> Option<u64> {
705 self.manager.newest_section()
706 }
707
708 pub fn is_empty(&self) -> bool {
710 self.manager.is_empty()
711 }
712
713 pub fn num_sections(&self) -> usize {
715 self.manager.num_sections()
716 }
717
718 pub async fn destroy(self) -> Result<(), Error> {
720 self.manager.destroy().await
721 }
722
723 pub async fn clear(&mut self) -> Result<(), Error> {
727 self.manager.clear().await
728 }
729}
730
731#[cfg(test)]
732mod tests {
733 use super::*;
734 use commonware_macros::test_traced;
735 use commonware_runtime::{deterministic, Blob, BufMut, Metrics, Runner, Storage};
736 use commonware_utils::{NZUsize, NZU16};
737 use futures::{pin_mut, StreamExt};
738 use std::num::NonZeroU16;
739
740 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
741 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
742
743 #[test_traced]
744 fn test_journal_append_and_read() {
745 let executor = deterministic::Runner::default();
747
748 executor.start(|context| async move {
750 let cfg = Config {
752 partition: "test-partition".into(),
753 compression: None,
754 codec_config: (),
755 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
756 write_buffer: NZUsize!(1024),
757 };
758 let index = 1u64;
759 let data = 10;
760 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
761 .await
762 .expect("Failed to initialize journal");
763
764 journal
766 .append(index, &data)
767 .await
768 .expect("Failed to append data");
769
770 let buffer = context.encode();
772 assert!(buffer.contains("first_tracked 1"));
773
774 journal.sync(index).await.expect("Failed to sync journal");
776 drop(journal);
777 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg)
778 .await
779 .expect("Failed to re-initialize journal");
780
781 let mut items = Vec::new();
783 let stream = journal
784 .replay(0, 0, NZUsize!(1024))
785 .await
786 .expect("unable to setup replay");
787 pin_mut!(stream);
788 while let Some(result) = stream.next().await {
789 match result {
790 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
791 Err(err) => panic!("Failed to read item: {err}"),
792 }
793 }
794
795 assert_eq!(items.len(), 1);
797 assert_eq!(items[0].0, index);
798 assert_eq!(items[0].1, data);
799
800 let buffer = context.encode();
802 assert!(buffer.contains("second_tracked 1"));
803 });
804 }
805
806 #[test_traced]
807 fn test_journal_multiple_appends_and_reads() {
808 let executor = deterministic::Runner::default();
810
811 executor.start(|context| async move {
813 let cfg = Config {
815 partition: "test-partition".into(),
816 compression: None,
817 codec_config: (),
818 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
819 write_buffer: NZUsize!(1024),
820 };
821
822 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
824 .await
825 .expect("Failed to initialize journal");
826
827 let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
829 for (index, data) in &data_items {
830 journal
831 .append(*index, data)
832 .await
833 .expect("Failed to append data");
834 journal.sync(*index).await.expect("Failed to sync blob");
835 }
836
837 let buffer = context.encode();
839 assert!(buffer.contains("first_tracked 3"));
840 assert!(buffer.contains("first_synced_total 4"));
841
842 drop(journal);
844 let journal = Journal::init(context.with_label("second"), cfg)
845 .await
846 .expect("Failed to re-initialize journal");
847
848 let mut items = Vec::<(u64, u32)>::new();
850 {
851 let stream = journal
852 .replay(0, 0, NZUsize!(1024))
853 .await
854 .expect("unable to setup replay");
855 pin_mut!(stream);
856 while let Some(result) = stream.next().await {
857 match result {
858 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
859 Err(err) => panic!("Failed to read item: {err}"),
860 }
861 }
862 }
863
864 assert_eq!(items.len(), data_items.len());
866 for ((expected_index, expected_data), (actual_index, actual_data)) in
867 data_items.iter().zip(items.iter())
868 {
869 assert_eq!(actual_index, expected_index);
870 assert_eq!(actual_data, expected_data);
871 }
872
873 journal.destroy().await.expect("Failed to destroy journal");
875 });
876 }
877
878 #[test_traced]
879 fn test_journal_prune_blobs() {
880 let executor = deterministic::Runner::default();
882
883 executor.start(|context| async move {
885 let cfg = Config {
887 partition: "test-partition".into(),
888 compression: None,
889 codec_config: (),
890 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
891 write_buffer: NZUsize!(1024),
892 };
893
894 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
896 .await
897 .expect("Failed to initialize journal");
898
899 for index in 1u64..=5u64 {
901 journal
902 .append(index, &index)
903 .await
904 .expect("Failed to append data");
905 journal.sync(index).await.expect("Failed to sync blob");
906 }
907
908 let data = 99;
910 journal
911 .append(2u64, &data)
912 .await
913 .expect("Failed to append data");
914 journal.sync(2u64).await.expect("Failed to sync blob");
915
916 journal.prune(3).await.expect("Failed to prune blobs");
918
919 let buffer = context.encode();
921 assert!(buffer.contains("first_pruned_total 2"));
922
923 journal.prune(2).await.expect("Failed to no-op prune");
925 let buffer = context.encode();
926 assert!(buffer.contains("first_pruned_total 2"));
927
928 drop(journal);
930 let mut journal = Journal::init(context.with_label("second"), cfg.clone())
931 .await
932 .expect("Failed to re-initialize journal");
933
934 let mut items = Vec::<(u64, u64)>::new();
936 {
937 let stream = journal
938 .replay(0, 0, NZUsize!(1024))
939 .await
940 .expect("unable to setup replay");
941 pin_mut!(stream);
942 while let Some(result) = stream.next().await {
943 match result {
944 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
945 Err(err) => panic!("Failed to read item: {err}"),
946 }
947 }
948 }
949
950 assert_eq!(items.len(), 3);
952 let expected_indices = [3u64, 4u64, 5u64];
953 for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
954 assert_eq!(item.0, *expected_index);
955 }
956
957 journal.prune(6).await.expect("Failed to prune blobs");
959
960 drop(journal);
962
963 assert!(context
968 .scan(&cfg.partition)
969 .await
970 .expect("Failed to list blobs")
971 .is_empty());
972 });
973 }
974
975 #[test_traced]
976 fn test_journal_prune_guard() {
977 let executor = deterministic::Runner::default();
978
979 executor.start(|context| async move {
980 let cfg = Config {
981 partition: "test-partition".into(),
982 compression: None,
983 codec_config: (),
984 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
985 write_buffer: NZUsize!(1024),
986 };
987
988 let mut journal = Journal::init(context.clone(), cfg.clone())
989 .await
990 .expect("Failed to initialize journal");
991
992 for section in 1u64..=5u64 {
994 journal
995 .append(section, &(section as i32))
996 .await
997 .expect("Failed to append data");
998 journal.sync(section).await.expect("Failed to sync");
999 }
1000
1001 journal.prune(3).await.expect("Failed to prune");
1003
1004 match journal.append(1, &100).await {
1008 Err(Error::AlreadyPrunedToSection(3)) => {}
1009 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1010 }
1011
1012 match journal.append(2, &100).await {
1013 Err(Error::AlreadyPrunedToSection(3)) => {}
1014 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1015 }
1016
1017 match journal.get(1, 0).await {
1019 Err(Error::AlreadyPrunedToSection(3)) => {}
1020 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1021 }
1022
1023 match journal.size(1).await {
1025 Err(Error::AlreadyPrunedToSection(3)) => {}
1026 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1027 }
1028
1029 match journal.rewind(2, 0).await {
1031 Err(Error::AlreadyPrunedToSection(3)) => {}
1032 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1033 }
1034
1035 match journal.rewind_section(1, 0).await {
1037 Err(Error::AlreadyPrunedToSection(3)) => {}
1038 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1039 }
1040
1041 match journal.sync(2).await {
1043 Err(Error::AlreadyPrunedToSection(3)) => {}
1044 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1045 }
1046
1047 assert!(journal.get(3, 0).await.is_ok());
1049 assert!(journal.get(4, 0).await.is_ok());
1050 assert!(journal.get(5, 0).await.is_ok());
1051 assert!(journal.size(3).await.is_ok());
1052 assert!(journal.sync(4).await.is_ok());
1053
1054 journal
1056 .append(3, &999)
1057 .await
1058 .expect("Should be able to append to section 3");
1059
1060 journal.prune(5).await.expect("Failed to prune");
1062
1063 match journal.get(3, 0).await {
1065 Err(Error::AlreadyPrunedToSection(5)) => {}
1066 other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1067 }
1068
1069 match journal.get(4, 0).await {
1070 Err(Error::AlreadyPrunedToSection(5)) => {}
1071 other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1072 }
1073
1074 assert!(journal.get(5, 0).await.is_ok());
1076 });
1077 }
1078
1079 #[test_traced]
1080 fn test_journal_prune_guard_across_restart() {
1081 let executor = deterministic::Runner::default();
1082
1083 executor.start(|context| async move {
1084 let cfg = Config {
1085 partition: "test-partition".into(),
1086 compression: None,
1087 codec_config: (),
1088 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1089 write_buffer: NZUsize!(1024),
1090 };
1091
1092 {
1094 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1095 .await
1096 .expect("Failed to initialize journal");
1097
1098 for section in 1u64..=5u64 {
1099 journal
1100 .append(section, &(section as i32))
1101 .await
1102 .expect("Failed to append data");
1103 journal.sync(section).await.expect("Failed to sync");
1104 }
1105
1106 journal.prune(3).await.expect("Failed to prune");
1107 }
1108
1109 {
1111 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
1112 .await
1113 .expect("Failed to re-initialize journal");
1114
1115 match journal.get(1, 0).await {
1118 Err(Error::SectionOutOfRange(1)) => {}
1119 other => panic!("Expected SectionOutOfRange(1), got {other:?}"),
1120 }
1121
1122 match journal.get(2, 0).await {
1123 Err(Error::SectionOutOfRange(2)) => {}
1124 other => panic!("Expected SectionOutOfRange(2), got {other:?}"),
1125 }
1126
1127 assert!(journal.get(3, 0).await.is_ok());
1129 assert!(journal.get(4, 0).await.is_ok());
1130 assert!(journal.get(5, 0).await.is_ok());
1131 }
1132 });
1133 }
1134
1135 #[test_traced]
1136 fn test_journal_with_invalid_blob_name() {
1137 let executor = deterministic::Runner::default();
1139
1140 executor.start(|context| async move {
1142 let cfg = Config {
1144 partition: "test-partition".into(),
1145 compression: None,
1146 codec_config: (),
1147 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1148 write_buffer: NZUsize!(1024),
1149 };
1150
1151 let invalid_blob_name = b"invalid"; let (blob, _) = context
1154 .open(&cfg.partition, invalid_blob_name)
1155 .await
1156 .expect("Failed to create blob with invalid name");
1157 blob.sync().await.expect("Failed to sync blob");
1158
1159 let result = Journal::<_, u64>::init(context, cfg).await;
1161
1162 assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1164 });
1165 }
1166
1167 #[test_traced]
1168 fn test_journal_read_size_missing() {
1169 let executor = deterministic::Runner::default();
1171
1172 executor.start(|context| async move {
1174 let cfg = Config {
1176 partition: "test-partition".into(),
1177 compression: None,
1178 codec_config: (),
1179 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1180 write_buffer: NZUsize!(1024),
1181 };
1182
1183 let section = 1u64;
1185 let blob_name = section.to_be_bytes();
1186 let (blob, _) = context
1187 .open(&cfg.partition, &blob_name)
1188 .await
1189 .expect("Failed to create blob");
1190
1191 let mut incomplete_data = Vec::new();
1193 UInt(u32::MAX).write(&mut incomplete_data);
1194 incomplete_data.truncate(1);
1195 blob.write_at(0, incomplete_data)
1196 .await
1197 .expect("Failed to write incomplete data");
1198 blob.sync().await.expect("Failed to sync blob");
1199
1200 let journal = Journal::init(context, cfg)
1202 .await
1203 .expect("Failed to initialize journal");
1204
1205 let stream = journal
1207 .replay(0, 0, NZUsize!(1024))
1208 .await
1209 .expect("unable to setup replay");
1210 pin_mut!(stream);
1211 let mut items = Vec::<(u64, u64)>::new();
1212 while let Some(result) = stream.next().await {
1213 match result {
1214 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1215 Err(err) => panic!("Failed to read item: {err}"),
1216 }
1217 }
1218 assert!(items.is_empty());
1219 });
1220 }
1221
1222 #[test_traced]
1223 fn test_journal_read_item_missing() {
1224 let executor = deterministic::Runner::default();
1226
1227 executor.start(|context| async move {
1229 let cfg = Config {
1231 partition: "test-partition".into(),
1232 compression: None,
1233 codec_config: (),
1234 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1235 write_buffer: NZUsize!(1024),
1236 };
1237
1238 let section = 1u64;
1240 let blob_name = section.to_be_bytes();
1241 let (blob, _) = context
1242 .open(&cfg.partition, &blob_name)
1243 .await
1244 .expect("Failed to create blob");
1245
1246 let item_size: u32 = 10; let mut buf = Vec::new();
1249 UInt(item_size).write(&mut buf); let data = [2u8; 5];
1251 BufMut::put_slice(&mut buf, &data);
1252 blob.write_at(0, buf)
1253 .await
1254 .expect("Failed to write incomplete item");
1255 blob.sync().await.expect("Failed to sync blob");
1256
1257 let journal = Journal::init(context, cfg)
1259 .await
1260 .expect("Failed to initialize journal");
1261
1262 let stream = journal
1264 .replay(0, 0, NZUsize!(1024))
1265 .await
1266 .expect("unable to setup replay");
1267 pin_mut!(stream);
1268 let mut items = Vec::<(u64, u64)>::new();
1269 while let Some(result) = stream.next().await {
1270 match result {
1271 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1272 Err(err) => panic!("Failed to read item: {err}"),
1273 }
1274 }
1275 assert!(items.is_empty());
1276 });
1277 }
1278
1279 #[test_traced]
1280 fn test_journal_read_checksum_missing() {
1281 let executor = deterministic::Runner::default();
1283
1284 executor.start(|context| async move {
1286 let cfg = Config {
1288 partition: "test-partition".into(),
1289 compression: None,
1290 codec_config: (),
1291 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1292 write_buffer: NZUsize!(1024),
1293 };
1294
1295 let section = 1u64;
1297 let blob_name = section.to_be_bytes();
1298 let (blob, _) = context
1299 .open(&cfg.partition, &blob_name)
1300 .await
1301 .expect("Failed to create blob");
1302
1303 let item_data = b"Test data";
1305 let item_size = item_data.len() as u32;
1306
1307 let mut buf = Vec::new();
1309 UInt(item_size).write(&mut buf);
1310 BufMut::put_slice(&mut buf, item_data);
1311 blob.write_at(0, buf)
1312 .await
1313 .expect("Failed to write item without checksum");
1314
1315 blob.sync().await.expect("Failed to sync blob");
1316
1317 let journal = Journal::init(context, cfg)
1319 .await
1320 .expect("Failed to initialize journal");
1321
1322 let stream = journal
1326 .replay(0, 0, NZUsize!(1024))
1327 .await
1328 .expect("unable to setup replay");
1329 pin_mut!(stream);
1330 let mut items = Vec::<(u64, u64)>::new();
1331 while let Some(result) = stream.next().await {
1332 match result {
1333 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1334 Err(err) => panic!("Failed to read item: {err}"),
1335 }
1336 }
1337 assert!(items.is_empty());
1338 });
1339 }
1340
1341 #[test_traced]
1342 fn test_journal_read_checksum_mismatch() {
1343 let executor = deterministic::Runner::default();
1345
1346 executor.start(|context| async move {
1348 let cfg = Config {
1350 partition: "test-partition".into(),
1351 compression: None,
1352 codec_config: (),
1353 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1354 write_buffer: NZUsize!(1024),
1355 };
1356
1357 let section = 1u64;
1359 let blob_name = section.to_be_bytes();
1360 let (blob, _) = context
1361 .open(&cfg.partition, &blob_name)
1362 .await
1363 .expect("Failed to create blob");
1364
1365 let item_data = b"Test data";
1367 let item_size = item_data.len() as u32;
1368 let incorrect_checksum: u32 = 0xDEADBEEF;
1369
1370 let mut buf = Vec::new();
1372 UInt(item_size).write(&mut buf);
1373 BufMut::put_slice(&mut buf, item_data);
1374 buf.put_u32(incorrect_checksum);
1375 blob.write_at(0, buf)
1376 .await
1377 .expect("Failed to write item with bad checksum");
1378
1379 blob.sync().await.expect("Failed to sync blob");
1380
1381 let journal = Journal::init(context.clone(), cfg.clone())
1383 .await
1384 .expect("Failed to initialize journal");
1385
1386 {
1388 let stream = journal
1389 .replay(0, 0, NZUsize!(1024))
1390 .await
1391 .expect("unable to setup replay");
1392 pin_mut!(stream);
1393 let mut items = Vec::<(u64, u64)>::new();
1394 while let Some(result) = stream.next().await {
1395 match result {
1396 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1397 Err(err) => panic!("Failed to read item: {err}"),
1398 }
1399 }
1400 assert!(items.is_empty());
1401 }
1402 drop(journal);
1403
1404 let (_, blob_size) = context
1406 .open(&cfg.partition, §ion.to_be_bytes())
1407 .await
1408 .expect("Failed to open blob");
1409 assert_eq!(blob_size, 0);
1410 });
1411 }
1412
1413 #[test_traced]
1414 fn test_journal_truncation_recovery() {
1415 let executor = deterministic::Runner::default();
1417
1418 executor.start(|context| async move {
1420 let cfg = Config {
1422 partition: "test-partition".into(),
1423 compression: None,
1424 codec_config: (),
1425 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1426 write_buffer: NZUsize!(1024),
1427 };
1428
1429 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1431 .await
1432 .expect("Failed to initialize journal");
1433
1434 journal.append(1, &1).await.expect("Failed to append data");
1436
1437 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1439 for (index, data) in &data_items {
1440 journal
1441 .append(*index, data)
1442 .await
1443 .expect("Failed to append data");
1444 journal.sync(*index).await.expect("Failed to sync blob");
1445 }
1446
1447 journal.sync_all().await.expect("Failed to sync");
1449 drop(journal);
1450
1451 let (blob, blob_size) = context
1453 .open(&cfg.partition, &2u64.to_be_bytes())
1454 .await
1455 .expect("Failed to open blob");
1456 blob.resize(blob_size - 4)
1457 .await
1458 .expect("Failed to corrupt blob");
1459 blob.sync().await.expect("Failed to sync blob");
1460
1461 let journal = Journal::init(context.with_label("second"), cfg.clone())
1463 .await
1464 .expect("Failed to re-initialize journal");
1465
1466 let mut items = Vec::<(u64, u32)>::new();
1468 {
1469 let stream = journal
1470 .replay(0, 0, NZUsize!(1024))
1471 .await
1472 .expect("unable to setup replay");
1473 pin_mut!(stream);
1474 while let Some(result) = stream.next().await {
1475 match result {
1476 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1477 Err(err) => panic!("Failed to read item: {err}"),
1478 }
1479 }
1480 }
1481 drop(journal);
1482
1483 assert_eq!(items.len(), 1);
1485 assert_eq!(items[0].0, 1);
1486 assert_eq!(items[0].1, 1);
1487
1488 let (_, blob_size) = context
1490 .open(&cfg.partition, &2u64.to_be_bytes())
1491 .await
1492 .expect("Failed to open blob");
1493 assert_eq!(blob_size, 0);
1494
1495 let mut journal = Journal::init(context.with_label("third"), cfg.clone())
1497 .await
1498 .expect("Failed to re-initialize journal");
1499
1500 let mut items = Vec::<(u64, u32)>::new();
1502 {
1503 let stream = journal
1504 .replay(0, 0, NZUsize!(1024))
1505 .await
1506 .expect("unable to setup replay");
1507 pin_mut!(stream);
1508 while let Some(result) = stream.next().await {
1509 match result {
1510 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1511 Err(err) => panic!("Failed to read item: {err}"),
1512 }
1513 }
1514 }
1515
1516 assert_eq!(items.len(), 1);
1518 assert_eq!(items[0].0, 1);
1519 assert_eq!(items[0].1, 1);
1520
1521 let (_offset, _) = journal.append(2, &5).await.expect("Failed to append data");
1523 journal.sync(2).await.expect("Failed to sync blob");
1524
1525 let item = journal.get(2, 0).await.expect("Failed to get item");
1527 assert_eq!(item, 5);
1528
1529 drop(journal);
1531
1532 let journal = Journal::init(context.clone(), cfg.clone())
1534 .await
1535 .expect("Failed to re-initialize journal");
1536
1537 let mut items = Vec::<(u64, u32)>::new();
1539 {
1540 let stream = journal
1541 .replay(0, 0, NZUsize!(1024))
1542 .await
1543 .expect("unable to setup replay");
1544 pin_mut!(stream);
1545 while let Some(result) = stream.next().await {
1546 match result {
1547 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1548 Err(err) => panic!("Failed to read item: {err}"),
1549 }
1550 }
1551 }
1552
1553 assert_eq!(items.len(), 2);
1555 assert_eq!(items[0].0, 1);
1556 assert_eq!(items[0].1, 1);
1557 assert_eq!(items[1].0, 2);
1558 assert_eq!(items[1].1, 5);
1559 });
1560 }
1561
1562 #[test_traced]
1563 fn test_journal_handling_extra_data() {
1564 let executor = deterministic::Runner::default();
1566
1567 executor.start(|context| async move {
1569 let cfg = Config {
1571 partition: "test-partition".into(),
1572 compression: None,
1573 codec_config: (),
1574 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1575 write_buffer: NZUsize!(1024),
1576 };
1577
1578 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1580 .await
1581 .expect("Failed to initialize journal");
1582
1583 journal.append(1, &1).await.expect("Failed to append data");
1585
1586 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1588 for (index, data) in &data_items {
1589 journal
1590 .append(*index, data)
1591 .await
1592 .expect("Failed to append data");
1593 journal.sync(*index).await.expect("Failed to sync blob");
1594 }
1595
1596 journal.sync_all().await.expect("Failed to sync");
1598 drop(journal);
1599
1600 let (blob, blob_size) = context
1602 .open(&cfg.partition, &2u64.to_be_bytes())
1603 .await
1604 .expect("Failed to open blob");
1605 blob.write_at(blob_size, vec![0u8; 16])
1606 .await
1607 .expect("Failed to add extra data");
1608 blob.sync().await.expect("Failed to sync blob");
1609
1610 let journal = Journal::init(context.with_label("second"), cfg)
1612 .await
1613 .expect("Failed to re-initialize journal");
1614
1615 let mut items = Vec::<(u64, i32)>::new();
1617 let stream = journal
1618 .replay(0, 0, NZUsize!(1024))
1619 .await
1620 .expect("unable to setup replay");
1621 pin_mut!(stream);
1622 while let Some(result) = stream.next().await {
1623 match result {
1624 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1625 Err(err) => panic!("Failed to read item: {err}"),
1626 }
1627 }
1628 });
1629 }
1630
1631 #[test_traced]
1632 fn test_journal_rewind() {
1633 let executor = deterministic::Runner::default();
1635 executor.start(|context| async move {
1636 let cfg = Config {
1638 partition: "test-partition".into(),
1639 compression: None,
1640 codec_config: (),
1641 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1642 write_buffer: NZUsize!(1024),
1643 };
1644 let mut journal = Journal::init(context, cfg).await.unwrap();
1645
1646 let size = journal.size(1).await.unwrap();
1648 assert_eq!(size, 0);
1649
1650 journal.append(1, &42i32).await.unwrap();
1652
1653 let size = journal.size(1).await.unwrap();
1655 assert!(size > 0);
1656
1657 journal.append(1, &43i32).await.unwrap();
1659 let new_size = journal.size(1).await.unwrap();
1660 assert!(new_size > size);
1661
1662 let size = journal.size(2).await.unwrap();
1664 assert_eq!(size, 0);
1665
1666 journal.append(2, &44i32).await.unwrap();
1668
1669 let size = journal.size(2).await.unwrap();
1671 assert!(size > 0);
1672
1673 journal.rewind(1, 0).await.unwrap();
1675
1676 let size = journal.size(1).await.unwrap();
1678 assert_eq!(size, 0);
1679
1680 let size = journal.size(2).await.unwrap();
1682 assert_eq!(size, 0);
1683 });
1684 }
1685
1686 #[test_traced]
1687 fn test_journal_rewind_section() {
1688 let executor = deterministic::Runner::default();
1690 executor.start(|context| async move {
1691 let cfg = Config {
1693 partition: "test-partition".into(),
1694 compression: None,
1695 codec_config: (),
1696 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1697 write_buffer: NZUsize!(1024),
1698 };
1699 let mut journal = Journal::init(context, cfg).await.unwrap();
1700
1701 let size = journal.size(1).await.unwrap();
1703 assert_eq!(size, 0);
1704
1705 journal.append(1, &42i32).await.unwrap();
1707
1708 let size = journal.size(1).await.unwrap();
1710 assert!(size > 0);
1711
1712 journal.append(1, &43i32).await.unwrap();
1714 let new_size = journal.size(1).await.unwrap();
1715 assert!(new_size > size);
1716
1717 let size = journal.size(2).await.unwrap();
1719 assert_eq!(size, 0);
1720
1721 journal.append(2, &44i32).await.unwrap();
1723
1724 let size = journal.size(2).await.unwrap();
1726 assert!(size > 0);
1727
1728 journal.rewind_section(1, 0).await.unwrap();
1730
1731 let size = journal.size(1).await.unwrap();
1733 assert_eq!(size, 0);
1734
1735 let size = journal.size(2).await.unwrap();
1737 assert!(size > 0);
1738 });
1739 }
1740
1741 #[test_traced]
1742 fn test_journal_small_items() {
1743 let executor = deterministic::Runner::default();
1744 executor.start(|context| async move {
1745 let cfg = Config {
1746 partition: "test-partition".into(),
1747 compression: None,
1748 codec_config: (),
1749 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1750 write_buffer: NZUsize!(1024),
1751 };
1752
1753 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1754 .await
1755 .expect("Failed to initialize journal");
1756
1757 let num_items = 100;
1759 let mut offsets = Vec::new();
1760 for i in 0..num_items {
1761 let (offset, size) = journal
1762 .append(1, &(i as u8))
1763 .await
1764 .expect("Failed to append data");
1765 assert_eq!(size, 1, "u8 should encode to 1 byte");
1766 offsets.push(offset);
1767 }
1768 journal.sync(1).await.expect("Failed to sync");
1769
1770 for (i, &offset) in offsets.iter().enumerate() {
1772 let item: u8 = journal.get(1, offset).await.expect("Failed to get item");
1773 assert_eq!(item, i as u8, "Item mismatch at offset {offset}");
1774 }
1775
1776 drop(journal);
1778 let journal = Journal::<_, u8>::init(context.with_label("second"), cfg)
1779 .await
1780 .expect("Failed to re-initialize journal");
1781
1782 let stream = journal
1784 .replay(0, 0, NZUsize!(1024))
1785 .await
1786 .expect("Failed to setup replay");
1787 pin_mut!(stream);
1788
1789 let mut count = 0;
1790 while let Some(result) = stream.next().await {
1791 let (section, offset, size, item) = result.expect("Failed to replay item");
1792 assert_eq!(section, 1);
1793 assert_eq!(offset, offsets[count]);
1794 assert_eq!(size, 1);
1795 assert_eq!(item, count as u8);
1796 count += 1;
1797 }
1798 assert_eq!(count, num_items, "Should replay all items");
1799 });
1800 }
1801
1802 #[test_traced]
1803 fn test_journal_rewind_many_sections() {
1804 let executor = deterministic::Runner::default();
1805 executor.start(|context| async move {
1806 let cfg = Config {
1807 partition: "test-partition".into(),
1808 compression: None,
1809 codec_config: (),
1810 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1811 write_buffer: NZUsize!(1024),
1812 };
1813 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1814
1815 for section in 1u64..=10 {
1817 journal.append(section, &(section as i32)).await.unwrap();
1818 }
1819 journal.sync_all().await.unwrap();
1820
1821 for section in 1u64..=10 {
1823 let size = journal.size(section).await.unwrap();
1824 assert!(size > 0, "section {section} should have data");
1825 }
1826
1827 journal
1829 .rewind(5, journal.size(5).await.unwrap())
1830 .await
1831 .unwrap();
1832
1833 for section in 1u64..=5 {
1835 let size = journal.size(section).await.unwrap();
1836 assert!(size > 0, "section {section} should still have data");
1837 }
1838
1839 for section in 6u64..=10 {
1841 let size = journal.size(section).await.unwrap();
1842 assert_eq!(size, 0, "section {section} should be removed");
1843 }
1844
1845 {
1847 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1848 pin_mut!(stream);
1849 let mut items = Vec::new();
1850 while let Some(result) = stream.next().await {
1851 let (section, _, _, item) = result.unwrap();
1852 items.push((section, item));
1853 }
1854 assert_eq!(items.len(), 5);
1855 for (i, (section, item)) in items.iter().enumerate() {
1856 assert_eq!(*section, (i + 1) as u64);
1857 assert_eq!(*item, (i + 1) as i32);
1858 }
1859 }
1860
1861 journal.destroy().await.unwrap();
1862 });
1863 }
1864
1865 #[test_traced]
1866 fn test_journal_rewind_partial_truncation() {
1867 let executor = deterministic::Runner::default();
1868 executor.start(|context| async move {
1869 let cfg = Config {
1870 partition: "test-partition".into(),
1871 compression: None,
1872 codec_config: (),
1873 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1874 write_buffer: NZUsize!(1024),
1875 };
1876 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1877
1878 let mut sizes = Vec::new();
1880 for i in 0..5 {
1881 journal.append(1, &i).await.unwrap();
1882 journal.sync(1).await.unwrap();
1883 sizes.push(journal.size(1).await.unwrap());
1884 }
1885
1886 let target_size = sizes[2];
1888 journal.rewind(1, target_size).await.unwrap();
1889
1890 let new_size = journal.size(1).await.unwrap();
1892 assert_eq!(new_size, target_size);
1893
1894 {
1896 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1897 pin_mut!(stream);
1898 let mut items = Vec::new();
1899 while let Some(result) = stream.next().await {
1900 let (_, _, _, item) = result.unwrap();
1901 items.push(item);
1902 }
1903 assert_eq!(items.len(), 3);
1904 for (i, item) in items.iter().enumerate() {
1905 assert_eq!(*item, i as i32);
1906 }
1907 }
1908
1909 journal.destroy().await.unwrap();
1910 });
1911 }
1912
1913 #[test_traced]
1914 fn test_journal_rewind_nonexistent_target() {
1915 let executor = deterministic::Runner::default();
1916 executor.start(|context| async move {
1917 let cfg = Config {
1918 partition: "test-partition".into(),
1919 compression: None,
1920 codec_config: (),
1921 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1922 write_buffer: NZUsize!(1024),
1923 };
1924 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1925
1926 for section in 5u64..=7 {
1928 journal.append(section, &(section as i32)).await.unwrap();
1929 }
1930 journal.sync_all().await.unwrap();
1931
1932 journal.rewind(3, 0).await.unwrap();
1934
1935 for section in 5u64..=7 {
1937 let size = journal.size(section).await.unwrap();
1938 assert_eq!(size, 0, "section {section} should be removed");
1939 }
1940
1941 {
1943 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1944 pin_mut!(stream);
1945 let items: Vec<_> = stream.collect().await;
1946 assert!(items.is_empty());
1947 }
1948
1949 journal.destroy().await.unwrap();
1950 });
1951 }
1952
1953 #[test_traced]
1954 fn test_journal_rewind_persistence() {
1955 let executor = deterministic::Runner::default();
1956 executor.start(|context| async move {
1957 let cfg = Config {
1958 partition: "test-partition".into(),
1959 compression: None,
1960 codec_config: (),
1961 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1962 write_buffer: NZUsize!(1024),
1963 };
1964
1965 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1967 .await
1968 .unwrap();
1969 for section in 1u64..=5 {
1970 journal.append(section, &(section as i32)).await.unwrap();
1971 }
1972 journal.sync_all().await.unwrap();
1973
1974 let size = journal.size(2).await.unwrap();
1976 journal.rewind(2, size).await.unwrap();
1977 journal.sync_all().await.unwrap();
1978 drop(journal);
1979
1980 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
1982 .await
1983 .unwrap();
1984
1985 for section in 1u64..=2 {
1987 let size = journal.size(section).await.unwrap();
1988 assert!(size > 0, "section {section} should have data after restart");
1989 }
1990
1991 for section in 3u64..=5 {
1993 let size = journal.size(section).await.unwrap();
1994 assert_eq!(size, 0, "section {section} should be gone after restart");
1995 }
1996
1997 {
1999 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
2000 pin_mut!(stream);
2001 let mut items = Vec::new();
2002 while let Some(result) = stream.next().await {
2003 let (section, _, _, item) = result.unwrap();
2004 items.push((section, item));
2005 }
2006 assert_eq!(items.len(), 2);
2007 assert_eq!(items[0], (1, 1));
2008 assert_eq!(items[1], (2, 2));
2009 }
2010
2011 journal.destroy().await.unwrap();
2012 });
2013 }
2014
2015 #[test_traced]
2016 fn test_journal_rewind_to_zero_removes_all_newer() {
2017 let executor = deterministic::Runner::default();
2018 executor.start(|context| async move {
2019 let cfg = Config {
2020 partition: "test-partition".into(),
2021 compression: None,
2022 codec_config: (),
2023 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2024 write_buffer: NZUsize!(1024),
2025 };
2026 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
2027
2028 for section in 1u64..=3 {
2030 journal.append(section, &(section as i32)).await.unwrap();
2031 }
2032 journal.sync_all().await.unwrap();
2033
2034 journal.rewind(1, 0).await.unwrap();
2036
2037 let size = journal.size(1).await.unwrap();
2039 assert_eq!(size, 0, "section 1 should be empty");
2040
2041 for section in 2u64..=3 {
2043 let size = journal.size(section).await.unwrap();
2044 assert_eq!(size, 0, "section {section} should be removed");
2045 }
2046
2047 {
2049 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
2050 pin_mut!(stream);
2051 let items: Vec<_> = stream.collect().await;
2052 assert!(items.is_empty());
2053 }
2054
2055 journal.destroy().await.unwrap();
2056 });
2057 }
2058
2059 #[test_traced]
2060 fn test_journal_replay_start_offset_with_trailing_bytes() {
2061 let executor = deterministic::Runner::default();
2063 executor.start(|context| async move {
2064 let cfg = Config {
2065 partition: "test-partition".into(),
2066 compression: None,
2067 codec_config: (),
2068 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2069 write_buffer: NZUsize!(1024),
2070 };
2071 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2072 .await
2073 .expect("Failed to initialize journal");
2074
2075 for i in 0..5i32 {
2077 journal.append(1, &i).await.unwrap();
2078 }
2079 journal.sync(1).await.unwrap();
2080 let valid_logical_size = journal.size(1).await.unwrap();
2081 drop(journal);
2082
2083 let (blob, physical_size_before) = context
2085 .open(&cfg.partition, &1u64.to_be_bytes())
2086 .await
2087 .unwrap();
2088
2089 blob.write_at(physical_size_before, vec![0xFF, 0xFF])
2092 .await
2093 .unwrap();
2094 blob.sync().await.unwrap();
2095
2096 let start_offset = valid_logical_size;
2100 {
2101 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2102 .await
2103 .unwrap();
2104
2105 let stream = journal
2106 .replay(1, start_offset, NZUsize!(1024))
2107 .await
2108 .unwrap();
2109 pin_mut!(stream);
2110
2111 while let Some(_result) = stream.next().await {}
2113 }
2114
2115 let (_, physical_size_after) = context
2117 .open(&cfg.partition, &1u64.to_be_bytes())
2118 .await
2119 .unwrap();
2120
2121 assert!(
2124 physical_size_after >= physical_size_before,
2125 "Valid data was lost! Physical blob truncated from {physical_size_before} to \
2126 {physical_size_after}. Logical valid size was {valid_logical_size}. \
2127 This indicates valid_offset was incorrectly initialized to 0 instead of start_offset."
2128 );
2129 });
2130 }
2131
2132 #[test_traced]
2133 fn test_journal_large_item_spanning_pages() {
2134 const LARGE_SIZE: usize = 2048;
2136 type LargeItem = [u8; LARGE_SIZE];
2137
2138 let executor = deterministic::Runner::default();
2139 executor.start(|context| async move {
2140 let cfg = Config {
2141 partition: "test-partition".into(),
2142 compression: None,
2143 codec_config: (),
2144 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2145 write_buffer: NZUsize!(4096),
2146 };
2147 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2148 .await
2149 .expect("Failed to initialize journal");
2150
2151 let mut large_data: LargeItem = [0u8; LARGE_SIZE];
2153 for (i, byte) in large_data.iter_mut().enumerate() {
2154 *byte = (i % 256) as u8;
2155 }
2156 assert!(
2157 LARGE_SIZE > PAGE_SIZE.get() as usize,
2158 "Item must be larger than page size"
2159 );
2160
2161 let (offset, size) = journal
2163 .append(1, &large_data)
2164 .await
2165 .expect("Failed to append large item");
2166 assert_eq!(size as usize, LARGE_SIZE);
2167 journal.sync(1).await.expect("Failed to sync");
2168
2169 let retrieved: LargeItem = journal
2171 .get(1, offset)
2172 .await
2173 .expect("Failed to get large item");
2174 assert_eq!(retrieved, large_data, "Random access read mismatch");
2175
2176 drop(journal);
2178 let journal = Journal::<_, LargeItem>::init(context.with_label("second"), cfg.clone())
2179 .await
2180 .expect("Failed to re-initialize journal");
2181
2182 {
2184 let stream = journal
2185 .replay(0, 0, NZUsize!(1024))
2186 .await
2187 .expect("Failed to setup replay");
2188 pin_mut!(stream);
2189
2190 let mut items = Vec::new();
2191 while let Some(result) = stream.next().await {
2192 let (section, off, sz, item) = result.expect("Failed to replay item");
2193 items.push((section, off, sz, item));
2194 }
2195
2196 assert_eq!(items.len(), 1, "Should have exactly one item");
2197 let (section, off, sz, item) = &items[0];
2198 assert_eq!(*section, 1);
2199 assert_eq!(*off, offset);
2200 assert_eq!(*sz as usize, LARGE_SIZE);
2201 assert_eq!(*item, large_data, "Replay read mismatch");
2202 }
2203
2204 journal.destroy().await.unwrap();
2205 });
2206 }
2207
2208 #[test_traced]
2209 fn test_journal_non_contiguous_sections() {
2210 let executor = deterministic::Runner::default();
2213 executor.start(|context| async move {
2214 let cfg = Config {
2215 partition: "test-partition".into(),
2216 compression: None,
2217 codec_config: (),
2218 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2219 write_buffer: NZUsize!(1024),
2220 };
2221 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2222 .await
2223 .expect("Failed to initialize journal");
2224
2225 let sections_and_data = [(1u64, 100i32), (5u64, 500i32), (10u64, 1000i32)];
2227 let mut offsets = Vec::new();
2228
2229 for (section, data) in §ions_and_data {
2230 let (offset, _) = journal
2231 .append(*section, data)
2232 .await
2233 .expect("Failed to append");
2234 offsets.push(offset);
2235 }
2236 journal.sync_all().await.expect("Failed to sync");
2237
2238 for (i, (section, expected_data)) in sections_and_data.iter().enumerate() {
2240 let retrieved: i32 = journal
2241 .get(*section, offsets[i])
2242 .await
2243 .expect("Failed to get item");
2244 assert_eq!(retrieved, *expected_data);
2245 }
2246
2247 for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
2249 let result = journal.get(missing_section, 0).await;
2250 assert!(
2251 matches!(result, Err(Error::SectionOutOfRange(_))),
2252 "Expected SectionOutOfRange for section {}, got {:?}",
2253 missing_section,
2254 result
2255 );
2256 }
2257
2258 drop(journal);
2260 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2261 .await
2262 .expect("Failed to re-initialize journal");
2263
2264 {
2266 let stream = journal
2267 .replay(0, 0, NZUsize!(1024))
2268 .await
2269 .expect("Failed to setup replay");
2270 pin_mut!(stream);
2271
2272 let mut items = Vec::new();
2273 while let Some(result) = stream.next().await {
2274 let (section, _, _, item) = result.expect("Failed to replay item");
2275 items.push((section, item));
2276 }
2277
2278 assert_eq!(items.len(), 3, "Should have 3 items");
2279 assert_eq!(items[0], (1, 100));
2280 assert_eq!(items[1], (5, 500));
2281 assert_eq!(items[2], (10, 1000));
2282 }
2283
2284 {
2286 let stream = journal
2287 .replay(5, 0, NZUsize!(1024))
2288 .await
2289 .expect("Failed to setup replay from section 5");
2290 pin_mut!(stream);
2291
2292 let mut items = Vec::new();
2293 while let Some(result) = stream.next().await {
2294 let (section, _, _, item) = result.expect("Failed to replay item");
2295 items.push((section, item));
2296 }
2297
2298 assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
2299 assert_eq!(items[0], (5, 500));
2300 assert_eq!(items[1], (10, 1000));
2301 }
2302
2303 {
2305 let stream = journal
2306 .replay(3, 0, NZUsize!(1024))
2307 .await
2308 .expect("Failed to setup replay from section 3");
2309 pin_mut!(stream);
2310
2311 let mut items = Vec::new();
2312 while let Some(result) = stream.next().await {
2313 let (section, _, _, item) = result.expect("Failed to replay item");
2314 items.push((section, item));
2315 }
2316
2317 assert_eq!(items.len(), 2);
2319 assert_eq!(items[0], (5, 500));
2320 assert_eq!(items[1], (10, 1000));
2321 }
2322
2323 journal.destroy().await.unwrap();
2324 });
2325 }
2326
2327 #[test_traced]
2328 fn test_journal_empty_section_in_middle() {
2329 let executor = deterministic::Runner::default();
2332 executor.start(|context| async move {
2333 let cfg = Config {
2334 partition: "test-partition".into(),
2335 compression: None,
2336 codec_config: (),
2337 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2338 write_buffer: NZUsize!(1024),
2339 };
2340 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2341 .await
2342 .expect("Failed to initialize journal");
2343
2344 journal.append(1, &100i32).await.expect("Failed to append");
2346
2347 journal.append(2, &200i32).await.expect("Failed to append");
2350 journal.sync(2).await.expect("Failed to sync");
2351 journal
2352 .rewind_section(2, 0)
2353 .await
2354 .expect("Failed to rewind");
2355
2356 journal.append(3, &300i32).await.expect("Failed to append");
2358
2359 journal.sync_all().await.expect("Failed to sync");
2360
2361 assert!(journal.size(1).await.unwrap() > 0);
2363 assert_eq!(journal.size(2).await.unwrap(), 0);
2364 assert!(journal.size(3).await.unwrap() > 0);
2365
2366 drop(journal);
2368 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2369 .await
2370 .expect("Failed to re-initialize journal");
2371
2372 {
2374 let stream = journal
2375 .replay(0, 0, NZUsize!(1024))
2376 .await
2377 .expect("Failed to setup replay");
2378 pin_mut!(stream);
2379
2380 let mut items = Vec::new();
2381 while let Some(result) = stream.next().await {
2382 let (section, _, _, item) = result.expect("Failed to replay item");
2383 items.push((section, item));
2384 }
2385
2386 assert_eq!(
2387 items.len(),
2388 2,
2389 "Should have 2 items (skipping empty section)"
2390 );
2391 assert_eq!(items[0], (1, 100));
2392 assert_eq!(items[1], (3, 300));
2393 }
2394
2395 {
2397 let stream = journal
2398 .replay(2, 0, NZUsize!(1024))
2399 .await
2400 .expect("Failed to setup replay from section 2");
2401 pin_mut!(stream);
2402
2403 let mut items = Vec::new();
2404 while let Some(result) = stream.next().await {
2405 let (section, _, _, item) = result.expect("Failed to replay item");
2406 items.push((section, item));
2407 }
2408
2409 assert_eq!(items.len(), 1, "Should have 1 item from section 3");
2410 assert_eq!(items[0], (3, 300));
2411 }
2412
2413 journal.destroy().await.unwrap();
2414 });
2415 }
2416
2417 #[test_traced]
2418 fn test_journal_item_exactly_page_size() {
2419 const ITEM_SIZE: usize = PAGE_SIZE.get() as usize;
2422 type ExactItem = [u8; ITEM_SIZE];
2423
2424 let executor = deterministic::Runner::default();
2425 executor.start(|context| async move {
2426 let cfg = Config {
2427 partition: "test-partition".into(),
2428 compression: None,
2429 codec_config: (),
2430 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2431 write_buffer: NZUsize!(4096),
2432 };
2433 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2434 .await
2435 .expect("Failed to initialize journal");
2436
2437 let mut exact_data: ExactItem = [0u8; ITEM_SIZE];
2439 for (i, byte) in exact_data.iter_mut().enumerate() {
2440 *byte = (i % 256) as u8;
2441 }
2442
2443 let (offset, size) = journal
2445 .append(1, &exact_data)
2446 .await
2447 .expect("Failed to append exact item");
2448 assert_eq!(size as usize, ITEM_SIZE);
2449 journal.sync(1).await.expect("Failed to sync");
2450
2451 let retrieved: ExactItem = journal
2453 .get(1, offset)
2454 .await
2455 .expect("Failed to get exact item");
2456 assert_eq!(retrieved, exact_data, "Random access read mismatch");
2457
2458 drop(journal);
2460 let journal = Journal::<_, ExactItem>::init(context.with_label("second"), cfg.clone())
2461 .await
2462 .expect("Failed to re-initialize journal");
2463
2464 {
2466 let stream = journal
2467 .replay(0, 0, NZUsize!(1024))
2468 .await
2469 .expect("Failed to setup replay");
2470 pin_mut!(stream);
2471
2472 let mut items = Vec::new();
2473 while let Some(result) = stream.next().await {
2474 let (section, off, sz, item) = result.expect("Failed to replay item");
2475 items.push((section, off, sz, item));
2476 }
2477
2478 assert_eq!(items.len(), 1, "Should have exactly one item");
2479 let (section, off, sz, item) = &items[0];
2480 assert_eq!(*section, 1);
2481 assert_eq!(*off, offset);
2482 assert_eq!(*sz as usize, ITEM_SIZE);
2483 assert_eq!(*item, exact_data, "Replay read mismatch");
2484 }
2485
2486 journal.destroy().await.unwrap();
2487 });
2488 }
2489
2490 #[test_traced]
2491 fn test_journal_varint_spanning_page_boundary() {
2492 const SMALL_PAGE: NonZeroU16 = NZU16!(16);
2500
2501 let executor = deterministic::Runner::default();
2502 executor.start(|context| async move {
2503 let cfg = Config {
2504 partition: "test-partition".into(),
2505 compression: None,
2506 codec_config: (),
2507 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE, PAGE_CACHE_SIZE),
2508 write_buffer: NZUsize!(1024),
2509 };
2510 let mut journal: Journal<_, [u8; 128]> =
2511 Journal::init(context.with_label("first"), cfg.clone())
2512 .await
2513 .expect("Failed to initialize journal");
2514
2515 let item1: [u8; 128] = [1u8; 128];
2517 let item2: [u8; 128] = [2u8; 128];
2518 let item3: [u8; 128] = [3u8; 128];
2519
2520 let (offset1, _) = journal.append(1, &item1).await.expect("Failed to append");
2523 let (offset2, _) = journal.append(1, &item2).await.expect("Failed to append");
2524 let (offset3, _) = journal.append(1, &item3).await.expect("Failed to append");
2525
2526 journal.sync(1).await.expect("Failed to sync");
2527
2528 let retrieved1: [u8; 128] = journal.get(1, offset1).await.expect("Failed to get");
2530 let retrieved2: [u8; 128] = journal.get(1, offset2).await.expect("Failed to get");
2531 let retrieved3: [u8; 128] = journal.get(1, offset3).await.expect("Failed to get");
2532 assert_eq!(retrieved1, item1);
2533 assert_eq!(retrieved2, item2);
2534 assert_eq!(retrieved3, item3);
2535
2536 drop(journal);
2538 let journal: Journal<_, [u8; 128]> =
2539 Journal::init(context.with_label("second"), cfg.clone())
2540 .await
2541 .expect("Failed to re-initialize journal");
2542
2543 {
2545 let stream = journal
2546 .replay(0, 0, NZUsize!(64))
2547 .await
2548 .expect("Failed to setup replay");
2549 pin_mut!(stream);
2550
2551 let mut items = Vec::new();
2552 while let Some(result) = stream.next().await {
2553 let (section, off, _, item) = result.expect("Failed to replay item");
2554 items.push((section, off, item));
2555 }
2556
2557 assert_eq!(items.len(), 3, "Should have 3 items");
2558 assert_eq!(items[0], (1, offset1, item1));
2559 assert_eq!(items[1], (1, offset2, item2));
2560 assert_eq!(items[2], (1, offset3, item3));
2561 }
2562
2563 journal.destroy().await.unwrap();
2564 });
2565 }
2566
2567 #[test_traced]
2568 fn test_journal_clear() {
2569 let executor = deterministic::Runner::default();
2570 executor.start(|context| async move {
2571 let cfg = Config {
2572 partition: "clear-test".into(),
2573 compression: None,
2574 codec_config: (),
2575 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2576 write_buffer: NZUsize!(1024),
2577 };
2578
2579 let mut journal: Journal<_, u64> =
2580 Journal::init(context.with_label("journal"), cfg.clone())
2581 .await
2582 .expect("Failed to initialize journal");
2583
2584 for section in 0..5u64 {
2586 for i in 0..10u64 {
2587 journal
2588 .append(section, &(section * 1000 + i))
2589 .await
2590 .expect("Failed to append");
2591 }
2592 journal.sync(section).await.expect("Failed to sync");
2593 }
2594
2595 assert_eq!(journal.get(0, 0).await.unwrap(), 0);
2597 assert_eq!(journal.get(4, 0).await.unwrap(), 4000);
2598
2599 journal.clear().await.expect("Failed to clear");
2601
2602 for section in 0..5u64 {
2604 assert!(matches!(
2605 journal.get(section, 0).await,
2606 Err(Error::SectionOutOfRange(s)) if s == section
2607 ));
2608 }
2609
2610 for i in 0..5u64 {
2612 journal
2613 .append(10, &(i * 100))
2614 .await
2615 .expect("Failed to append after clear");
2616 }
2617 journal.sync(10).await.expect("Failed to sync after clear");
2618
2619 assert_eq!(journal.get(10, 0).await.unwrap(), 0);
2621
2622 assert!(matches!(
2624 journal.get(0, 0).await,
2625 Err(Error::SectionOutOfRange(0))
2626 ));
2627
2628 journal.destroy().await.unwrap();
2629 });
2630 }
2631}