1use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
84use crate::journal::Error;
85use commonware_codec::{
86 varint::{UInt, MAX_U32_VARINT_SIZE},
87 Codec, CodecShared, EncodeSize, ReadExt, Write as CodecWrite,
88};
89use commonware_runtime::{
90 buffer::paged::{Append, CacheRef, Replay},
91 Blob, Buf, BufMut, IoBuf, IoBufMut, Metrics, Storage,
92};
93use futures::stream::{self, Stream, StreamExt};
94use std::{io::Cursor, num::NonZeroUsize};
95use tracing::{trace, warn};
96use zstd::{bulk::compress, decode_all};
97
98#[derive(Clone)]
100pub struct Config<C> {
101 pub partition: String,
104
105 pub compression: Option<u8>,
107
108 pub codec_config: C,
110
111 pub page_cache: CacheRef,
113
114 pub write_buffer: NonZeroUsize,
116}
117
118#[inline]
121fn decode_length_prefix(buf: &mut impl Buf) -> Result<(usize, usize), Error> {
122 let initial = buf.remaining();
123 let size = UInt::<u32>::read(buf)?.0 as usize;
124 let varint_len = initial - buf.remaining();
125 Ok((size, varint_len))
126}
127
128enum ItemInfo {
130 Complete {
132 varint_len: usize,
134 data_len: usize,
136 },
137 Incomplete {
139 varint_len: usize,
141 prefix_len: usize,
143 total_len: usize,
145 },
146}
147
148fn find_item(buf: &mut impl Buf, offset: u64) -> Result<(u64, ItemInfo), Error> {
152 let available = buf.remaining();
153 let (size, varint_len) = decode_length_prefix(buf)?;
154 let next_offset = offset
155 .checked_add(varint_len as u64)
156 .ok_or(Error::OffsetOverflow)?
157 .checked_add(size as u64)
158 .ok_or(Error::OffsetOverflow)?;
159 let buffered = available.saturating_sub(varint_len);
160
161 let item = if buffered >= size {
162 ItemInfo::Complete {
163 varint_len,
164 data_len: size,
165 }
166 } else {
167 ItemInfo::Incomplete {
168 varint_len,
169 prefix_len: buffered,
170 total_len: size,
171 }
172 };
173
174 Ok((next_offset, item))
175}
176
177struct ReplayState<B: Blob, C> {
179 section: u64,
180 blob: Append<B>,
181 replay: Replay<B>,
182 skip_bytes: u64,
183 offset: u64,
184 valid_offset: u64,
185 codec_config: C,
186 compressed: bool,
187 done: bool,
188}
189
190fn decode_item<V: Codec>(item_data: impl Buf, cfg: &V::Cfg, compressed: bool) -> Result<V, Error> {
192 if compressed {
193 let decompressed =
194 decode_all(item_data.reader()).map_err(|_| Error::DecompressionFailed)?;
195 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)
196 } else {
197 V::decode_cfg(item_data, cfg).map_err(Error::Codec)
198 }
199}
200
201pub struct Journal<E: Storage + Metrics, V: Codec> {
215 manager: Manager<E, AppendFactory>,
216
217 compression: Option<u8>,
219
220 codec_config: V::Cfg,
222}
223
224impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
225 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
231 let manager_cfg = ManagerConfig {
232 partition: cfg.partition,
233 factory: AppendFactory {
234 write_buffer: cfg.write_buffer,
235 page_cache_ref: cfg.page_cache,
236 },
237 };
238 let manager = Manager::init(context, manager_cfg).await?;
239
240 Ok(Self {
241 manager,
242 compression: cfg.compression,
243 codec_config: cfg.codec_config,
244 })
245 }
246
247 async fn read(
249 compressed: bool,
250 cfg: &V::Cfg,
251 blob: &Append<E::Blob>,
252 offset: u64,
253 ) -> Result<(u64, u32, V), Error> {
254 let (buf, available) = blob
256 .read_up_to(
257 offset,
258 MAX_U32_VARINT_SIZE,
259 IoBufMut::with_capacity(MAX_U32_VARINT_SIZE),
260 )
261 .await?;
262 let buf = buf.freeze();
263 let mut cursor = Cursor::new(buf.slice(..available));
264 let (next_offset, item_info) = find_item(&mut cursor, offset)?;
265
266 let (item_size, decoded) = match item_info {
268 ItemInfo::Complete {
269 varint_len,
270 data_len,
271 } => {
272 let data = buf.slice(varint_len..varint_len + data_len);
274 let decoded = decode_item::<V>(data, cfg, compressed)?;
275 (data_len as u32, decoded)
276 }
277 ItemInfo::Incomplete {
278 varint_len,
279 prefix_len,
280 total_len,
281 } => {
282 let prefix = buf.slice(varint_len..varint_len + prefix_len);
284 let read_offset = offset + varint_len as u64 + prefix_len as u64;
285 let remainder_len = total_len - prefix_len;
286 let mut remainder = vec![0u8; remainder_len];
287 blob.read_into(&mut remainder, read_offset).await?;
288 let chained = prefix.chain(IoBuf::from(remainder));
289 let decoded = decode_item::<V>(chained, cfg, compressed)?;
290 (total_len as u32, decoded)
291 }
292 };
293
294 Ok((next_offset, item_size, decoded))
295 }
296
297 pub async fn replay(
301 &self,
302 start_section: u64,
303 mut start_offset: u64,
304 buffer: NonZeroUsize,
305 ) -> Result<impl Stream<Item = Result<(u64, u64, u32, V), Error>> + Send + '_, Error> {
306 let codec_config = self.codec_config.clone();
308 let compressed = self.compression.is_some();
309 let mut blobs = Vec::new();
310 for (§ion, blob) in self.manager.sections_from(start_section) {
311 blobs.push((
312 section,
313 blob.clone(),
314 blob.replay(buffer).await?,
315 codec_config.clone(),
316 compressed,
317 ));
318 }
319
320 Ok(stream::iter(blobs).flat_map(
322 move |(section, blob, replay, codec_config, compressed)| {
323 let skip_bytes = if section == start_section {
325 start_offset
326 } else {
327 start_offset = 0;
328 0
329 };
330
331 stream::unfold(
332 ReplayState {
333 section,
334 blob,
335 replay,
336 skip_bytes,
337 offset: 0,
338 valid_offset: skip_bytes,
339 codec_config,
340 compressed,
341 done: false,
342 },
343 move |mut state| async move {
344 if state.done {
345 return None;
346 }
347
348 let blob_size = state.replay.blob_size();
349 let mut batch: Vec<Result<(u64, u64, u32, V), Error>> = Vec::new();
350 loop {
351 match state.replay.ensure(MAX_U32_VARINT_SIZE).await {
355 Ok(true) => {}
356 Ok(false) => {
357 if state.replay.remaining() == 0 {
359 state.done = true;
360 return if batch.is_empty() {
361 None
362 } else {
363 Some((batch, state))
364 };
365 }
366 }
368 Err(err) => {
369 batch.push(Err(err.into()));
370 state.done = true;
371 return Some((batch, state));
372 }
373 }
374
375 if state.skip_bytes > 0 {
377 let to_skip =
378 state.skip_bytes.min(state.replay.remaining() as u64) as usize;
379 state.replay.advance(to_skip);
380 state.skip_bytes -= to_skip as u64;
381 state.offset += to_skip as u64;
382 continue;
383 }
384
385 let before_remaining = state.replay.remaining();
387 let (item_size, varint_len) =
388 match decode_length_prefix(&mut state.replay) {
389 Ok(result) => result,
390 Err(err) => {
391 if state.replay.is_exhausted()
393 || before_remaining < MAX_U32_VARINT_SIZE
394 {
395 if state.valid_offset < blob_size
397 && state.offset < blob_size
398 {
399 warn!(
400 blob = state.section,
401 bad_offset = state.offset,
402 new_size = state.valid_offset,
403 "trailing bytes detected: truncating"
404 );
405 state.blob.resize(state.valid_offset).await.ok()?;
406 }
407 state.done = true;
408 return if batch.is_empty() {
409 None
410 } else {
411 Some((batch, state))
412 };
413 }
414 batch.push(Err(err));
415 state.done = true;
416 return Some((batch, state));
417 }
418 };
419
420 match state.replay.ensure(item_size).await {
422 Ok(true) => {}
423 Ok(false) => {
424 warn!(
426 blob = state.section,
427 bad_offset = state.offset,
428 new_size = state.valid_offset,
429 "incomplete item at end: truncating"
430 );
431 state.blob.resize(state.valid_offset).await.ok()?;
432 state.done = true;
433 return if batch.is_empty() {
434 None
435 } else {
436 Some((batch, state))
437 };
438 }
439 Err(err) => {
440 batch.push(Err(err.into()));
441 state.done = true;
442 return Some((batch, state));
443 }
444 }
445
446 let item_offset = state.offset;
448 let next_offset = match state
449 .offset
450 .checked_add(varint_len as u64)
451 .and_then(|o| o.checked_add(item_size as u64))
452 {
453 Some(o) => o,
454 None => {
455 batch.push(Err(Error::OffsetOverflow));
456 state.done = true;
457 return Some((batch, state));
458 }
459 };
460 match decode_item::<V>(
461 (&mut state.replay).take(item_size),
462 &state.codec_config,
463 state.compressed,
464 ) {
465 Ok(decoded) => {
466 batch.push(Ok((
467 state.section,
468 item_offset,
469 item_size as u32,
470 decoded,
471 )));
472 state.valid_offset = next_offset;
473 state.offset = next_offset;
474 }
475 Err(err) => {
476 batch.push(Err(err));
477 state.done = true;
478 return Some((batch, state));
479 }
480 }
481
482 if !batch.is_empty() && state.replay.remaining() < MAX_U32_VARINT_SIZE {
484 return Some((batch, state));
485 }
486 }
487 },
488 )
489 .flat_map(stream::iter)
490 },
491 ))
492 }
493
494 pub async fn append(&mut self, section: u64, item: &V) -> Result<(u64, u32), Error> {
498 let (buf, item_len) = if let Some(compression) = self.compression {
500 let encoded = item.encode();
502 let compressed =
503 compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?;
504 let item_len = compressed.len();
505 let item_len_u32: u32 = match item_len.try_into() {
506 Ok(len) => len,
507 Err(_) => return Err(Error::ItemTooLarge(item_len)),
508 };
509 let size_len = UInt(item_len_u32).encode_size();
510 let entry_len = size_len
511 .checked_add(item_len)
512 .ok_or(Error::OffsetOverflow)?;
513
514 let mut buf = Vec::with_capacity(entry_len);
515 UInt(item_len_u32).write(&mut buf);
516 buf.put_slice(&compressed);
517
518 (buf, item_len)
519 } else {
520 let item_len = item.encode_size();
522 let item_len_u32: u32 = match item_len.try_into() {
523 Ok(len) => len,
524 Err(_) => return Err(Error::ItemTooLarge(item_len)),
525 };
526 let size_len = UInt(item_len_u32).encode_size();
527 let entry_len = size_len
528 .checked_add(item_len)
529 .ok_or(Error::OffsetOverflow)?;
530
531 let mut buf = Vec::with_capacity(entry_len);
532 UInt(item_len_u32).write(&mut buf);
533 item.write(&mut buf);
534
535 (buf, item_len)
536 };
537
538 let blob = self.manager.get_or_create(section).await?;
540
541 let offset = blob.size().await;
543
544 blob.append(&buf).await?;
546 trace!(blob = section, offset, "appended item");
547 Ok((offset, item_len as u32))
548 }
549
550 pub async fn get(&self, section: u64, offset: u64) -> Result<V, Error> {
561 let blob = self
562 .manager
563 .get(section)?
564 .ok_or(Error::SectionOutOfRange(section))?;
565
566 let (_, _, item) =
568 Self::read(self.compression.is_some(), &self.codec_config, blob, offset).await?;
569 Ok(item)
570 }
571
572 pub async fn size(&self, section: u64) -> Result<u64, Error> {
576 self.manager.size(section).await
577 }
578
579 pub async fn rewind_to_offset(&mut self, section: u64, offset: u64) -> Result<(), Error> {
587 self.manager.rewind(section, offset).await
588 }
589
590 pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
600 self.manager.rewind(section, size).await
601 }
602
603 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
611 self.manager.rewind_section(section, size).await
612 }
613
614 pub async fn sync(&self, section: u64) -> Result<(), Error> {
618 self.manager.sync(section).await
619 }
620
621 pub async fn sync_all(&self) -> Result<(), Error> {
623 self.manager.sync_all().await
624 }
625
626 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
628 self.manager.prune(min).await
629 }
630
631 pub fn oldest_section(&self) -> Option<u64> {
633 self.manager.oldest_section()
634 }
635
636 pub fn newest_section(&self) -> Option<u64> {
638 self.manager.newest_section()
639 }
640
641 pub fn is_empty(&self) -> bool {
643 self.manager.is_empty()
644 }
645
646 pub fn num_sections(&self) -> usize {
648 self.manager.num_sections()
649 }
650
651 pub async fn destroy(self) -> Result<(), Error> {
653 self.manager.destroy().await
654 }
655
656 pub async fn clear(&mut self) -> Result<(), Error> {
660 self.manager.clear().await
661 }
662}
663
664#[cfg(test)]
665mod tests {
666 use super::*;
667 use commonware_macros::test_traced;
668 use commonware_runtime::{deterministic, Blob, BufMut, Metrics, Runner, Storage};
669 use commonware_utils::{NZUsize, NZU16};
670 use futures::{pin_mut, StreamExt};
671 use std::num::NonZeroU16;
672
673 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
674 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
675
676 #[test_traced]
677 fn test_journal_append_and_read() {
678 let executor = deterministic::Runner::default();
680
681 executor.start(|context| async move {
683 let cfg = Config {
685 partition: "test-partition".into(),
686 compression: None,
687 codec_config: (),
688 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
689 write_buffer: NZUsize!(1024),
690 };
691 let index = 1u64;
692 let data = 10;
693 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
694 .await
695 .expect("Failed to initialize journal");
696
697 journal
699 .append(index, &data)
700 .await
701 .expect("Failed to append data");
702
703 let buffer = context.encode();
705 assert!(buffer.contains("first_tracked 1"));
706
707 journal.sync(index).await.expect("Failed to sync journal");
709 drop(journal);
710 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg)
711 .await
712 .expect("Failed to re-initialize journal");
713
714 let mut items = Vec::new();
716 let stream = journal
717 .replay(0, 0, NZUsize!(1024))
718 .await
719 .expect("unable to setup replay");
720 pin_mut!(stream);
721 while let Some(result) = stream.next().await {
722 match result {
723 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
724 Err(err) => panic!("Failed to read item: {err}"),
725 }
726 }
727
728 assert_eq!(items.len(), 1);
730 assert_eq!(items[0].0, index);
731 assert_eq!(items[0].1, data);
732
733 let buffer = context.encode();
735 assert!(buffer.contains("second_tracked 1"));
736 });
737 }
738
739 #[test_traced]
740 fn test_journal_multiple_appends_and_reads() {
741 let executor = deterministic::Runner::default();
743
744 executor.start(|context| async move {
746 let cfg = Config {
748 partition: "test-partition".into(),
749 compression: None,
750 codec_config: (),
751 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
752 write_buffer: NZUsize!(1024),
753 };
754
755 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
757 .await
758 .expect("Failed to initialize journal");
759
760 let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
762 for (index, data) in &data_items {
763 journal
764 .append(*index, data)
765 .await
766 .expect("Failed to append data");
767 journal.sync(*index).await.expect("Failed to sync blob");
768 }
769
770 let buffer = context.encode();
772 assert!(buffer.contains("first_tracked 3"));
773 assert!(buffer.contains("first_synced_total 4"));
774
775 drop(journal);
777 let journal = Journal::init(context.with_label("second"), cfg)
778 .await
779 .expect("Failed to re-initialize journal");
780
781 let mut items = Vec::<(u64, u32)>::new();
783 {
784 let stream = journal
785 .replay(0, 0, NZUsize!(1024))
786 .await
787 .expect("unable to setup replay");
788 pin_mut!(stream);
789 while let Some(result) = stream.next().await {
790 match result {
791 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
792 Err(err) => panic!("Failed to read item: {err}"),
793 }
794 }
795 }
796
797 assert_eq!(items.len(), data_items.len());
799 for ((expected_index, expected_data), (actual_index, actual_data)) in
800 data_items.iter().zip(items.iter())
801 {
802 assert_eq!(actual_index, expected_index);
803 assert_eq!(actual_data, expected_data);
804 }
805
806 journal.destroy().await.expect("Failed to destroy journal");
808 });
809 }
810
811 #[test_traced]
812 fn test_journal_prune_blobs() {
813 let executor = deterministic::Runner::default();
815
816 executor.start(|context| async move {
818 let cfg = Config {
820 partition: "test-partition".into(),
821 compression: None,
822 codec_config: (),
823 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
824 write_buffer: NZUsize!(1024),
825 };
826
827 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
829 .await
830 .expect("Failed to initialize journal");
831
832 for index in 1u64..=5u64 {
834 journal
835 .append(index, &index)
836 .await
837 .expect("Failed to append data");
838 journal.sync(index).await.expect("Failed to sync blob");
839 }
840
841 let data = 99;
843 journal
844 .append(2u64, &data)
845 .await
846 .expect("Failed to append data");
847 journal.sync(2u64).await.expect("Failed to sync blob");
848
849 journal.prune(3).await.expect("Failed to prune blobs");
851
852 let buffer = context.encode();
854 assert!(buffer.contains("first_pruned_total 2"));
855
856 journal.prune(2).await.expect("Failed to no-op prune");
858 let buffer = context.encode();
859 assert!(buffer.contains("first_pruned_total 2"));
860
861 drop(journal);
863 let mut journal = Journal::init(context.with_label("second"), cfg.clone())
864 .await
865 .expect("Failed to re-initialize journal");
866
867 let mut items = Vec::<(u64, u64)>::new();
869 {
870 let stream = journal
871 .replay(0, 0, NZUsize!(1024))
872 .await
873 .expect("unable to setup replay");
874 pin_mut!(stream);
875 while let Some(result) = stream.next().await {
876 match result {
877 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
878 Err(err) => panic!("Failed to read item: {err}"),
879 }
880 }
881 }
882
883 assert_eq!(items.len(), 3);
885 let expected_indices = [3u64, 4u64, 5u64];
886 for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
887 assert_eq!(item.0, *expected_index);
888 }
889
890 journal.prune(6).await.expect("Failed to prune blobs");
892
893 drop(journal);
895
896 assert!(context
901 .scan(&cfg.partition)
902 .await
903 .expect("Failed to list blobs")
904 .is_empty());
905 });
906 }
907
908 #[test_traced]
909 fn test_journal_prune_guard() {
910 let executor = deterministic::Runner::default();
911
912 executor.start(|context| async move {
913 let cfg = Config {
914 partition: "test-partition".into(),
915 compression: None,
916 codec_config: (),
917 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
918 write_buffer: NZUsize!(1024),
919 };
920
921 let mut journal = Journal::init(context.clone(), cfg.clone())
922 .await
923 .expect("Failed to initialize journal");
924
925 for section in 1u64..=5u64 {
927 journal
928 .append(section, &(section as i32))
929 .await
930 .expect("Failed to append data");
931 journal.sync(section).await.expect("Failed to sync");
932 }
933
934 journal.prune(3).await.expect("Failed to prune");
936
937 match journal.append(1, &100).await {
941 Err(Error::AlreadyPrunedToSection(3)) => {}
942 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
943 }
944
945 match journal.append(2, &100).await {
946 Err(Error::AlreadyPrunedToSection(3)) => {}
947 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
948 }
949
950 match journal.get(1, 0).await {
952 Err(Error::AlreadyPrunedToSection(3)) => {}
953 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
954 }
955
956 match journal.size(1).await {
958 Err(Error::AlreadyPrunedToSection(3)) => {}
959 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
960 }
961
962 match journal.rewind(2, 0).await {
964 Err(Error::AlreadyPrunedToSection(3)) => {}
965 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
966 }
967
968 match journal.rewind_section(1, 0).await {
970 Err(Error::AlreadyPrunedToSection(3)) => {}
971 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
972 }
973
974 match journal.sync(2).await {
976 Err(Error::AlreadyPrunedToSection(3)) => {}
977 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
978 }
979
980 assert!(journal.get(3, 0).await.is_ok());
982 assert!(journal.get(4, 0).await.is_ok());
983 assert!(journal.get(5, 0).await.is_ok());
984 assert!(journal.size(3).await.is_ok());
985 assert!(journal.sync(4).await.is_ok());
986
987 journal
989 .append(3, &999)
990 .await
991 .expect("Should be able to append to section 3");
992
993 journal.prune(5).await.expect("Failed to prune");
995
996 match journal.get(3, 0).await {
998 Err(Error::AlreadyPrunedToSection(5)) => {}
999 other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1000 }
1001
1002 match journal.get(4, 0).await {
1003 Err(Error::AlreadyPrunedToSection(5)) => {}
1004 other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1005 }
1006
1007 assert!(journal.get(5, 0).await.is_ok());
1009 });
1010 }
1011
1012 #[test_traced]
1013 fn test_journal_prune_guard_across_restart() {
1014 let executor = deterministic::Runner::default();
1015
1016 executor.start(|context| async move {
1017 let cfg = Config {
1018 partition: "test-partition".into(),
1019 compression: None,
1020 codec_config: (),
1021 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1022 write_buffer: NZUsize!(1024),
1023 };
1024
1025 {
1027 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1028 .await
1029 .expect("Failed to initialize journal");
1030
1031 for section in 1u64..=5u64 {
1032 journal
1033 .append(section, &(section as i32))
1034 .await
1035 .expect("Failed to append data");
1036 journal.sync(section).await.expect("Failed to sync");
1037 }
1038
1039 journal.prune(3).await.expect("Failed to prune");
1040 }
1041
1042 {
1044 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
1045 .await
1046 .expect("Failed to re-initialize journal");
1047
1048 match journal.get(1, 0).await {
1051 Err(Error::SectionOutOfRange(1)) => {}
1052 other => panic!("Expected SectionOutOfRange(1), got {other:?}"),
1053 }
1054
1055 match journal.get(2, 0).await {
1056 Err(Error::SectionOutOfRange(2)) => {}
1057 other => panic!("Expected SectionOutOfRange(2), got {other:?}"),
1058 }
1059
1060 assert!(journal.get(3, 0).await.is_ok());
1062 assert!(journal.get(4, 0).await.is_ok());
1063 assert!(journal.get(5, 0).await.is_ok());
1064 }
1065 });
1066 }
1067
1068 #[test_traced]
1069 fn test_journal_with_invalid_blob_name() {
1070 let executor = deterministic::Runner::default();
1072
1073 executor.start(|context| async move {
1075 let cfg = Config {
1077 partition: "test-partition".into(),
1078 compression: None,
1079 codec_config: (),
1080 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1081 write_buffer: NZUsize!(1024),
1082 };
1083
1084 let invalid_blob_name = b"invalid"; let (blob, _) = context
1087 .open(&cfg.partition, invalid_blob_name)
1088 .await
1089 .expect("Failed to create blob with invalid name");
1090 blob.sync().await.expect("Failed to sync blob");
1091
1092 let result = Journal::<_, u64>::init(context, cfg).await;
1094
1095 assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1097 });
1098 }
1099
1100 #[test_traced]
1101 fn test_journal_read_size_missing() {
1102 let executor = deterministic::Runner::default();
1104
1105 executor.start(|context| async move {
1107 let cfg = Config {
1109 partition: "test-partition".into(),
1110 compression: None,
1111 codec_config: (),
1112 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1113 write_buffer: NZUsize!(1024),
1114 };
1115
1116 let section = 1u64;
1118 let blob_name = section.to_be_bytes();
1119 let (blob, _) = context
1120 .open(&cfg.partition, &blob_name)
1121 .await
1122 .expect("Failed to create blob");
1123
1124 let mut incomplete_data = Vec::new();
1126 UInt(u32::MAX).write(&mut incomplete_data);
1127 incomplete_data.truncate(1);
1128 blob.write_at(0, incomplete_data)
1129 .await
1130 .expect("Failed to write incomplete data");
1131 blob.sync().await.expect("Failed to sync blob");
1132
1133 let journal = Journal::init(context, cfg)
1135 .await
1136 .expect("Failed to initialize journal");
1137
1138 let stream = journal
1140 .replay(0, 0, NZUsize!(1024))
1141 .await
1142 .expect("unable to setup replay");
1143 pin_mut!(stream);
1144 let mut items = Vec::<(u64, u64)>::new();
1145 while let Some(result) = stream.next().await {
1146 match result {
1147 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1148 Err(err) => panic!("Failed to read item: {err}"),
1149 }
1150 }
1151 assert!(items.is_empty());
1152 });
1153 }
1154
1155 #[test_traced]
1156 fn test_journal_read_item_missing() {
1157 let executor = deterministic::Runner::default();
1159
1160 executor.start(|context| async move {
1162 let cfg = Config {
1164 partition: "test-partition".into(),
1165 compression: None,
1166 codec_config: (),
1167 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1168 write_buffer: NZUsize!(1024),
1169 };
1170
1171 let section = 1u64;
1173 let blob_name = section.to_be_bytes();
1174 let (blob, _) = context
1175 .open(&cfg.partition, &blob_name)
1176 .await
1177 .expect("Failed to create blob");
1178
1179 let item_size: u32 = 10; let mut buf = Vec::new();
1182 UInt(item_size).write(&mut buf); let data = [2u8; 5];
1184 BufMut::put_slice(&mut buf, &data);
1185 blob.write_at(0, buf)
1186 .await
1187 .expect("Failed to write incomplete item");
1188 blob.sync().await.expect("Failed to sync blob");
1189
1190 let journal = Journal::init(context, cfg)
1192 .await
1193 .expect("Failed to initialize journal");
1194
1195 let stream = journal
1197 .replay(0, 0, NZUsize!(1024))
1198 .await
1199 .expect("unable to setup replay");
1200 pin_mut!(stream);
1201 let mut items = Vec::<(u64, u64)>::new();
1202 while let Some(result) = stream.next().await {
1203 match result {
1204 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1205 Err(err) => panic!("Failed to read item: {err}"),
1206 }
1207 }
1208 assert!(items.is_empty());
1209 });
1210 }
1211
1212 #[test_traced]
1213 fn test_journal_read_checksum_missing() {
1214 let executor = deterministic::Runner::default();
1216
1217 executor.start(|context| async move {
1219 let cfg = Config {
1221 partition: "test-partition".into(),
1222 compression: None,
1223 codec_config: (),
1224 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1225 write_buffer: NZUsize!(1024),
1226 };
1227
1228 let section = 1u64;
1230 let blob_name = section.to_be_bytes();
1231 let (blob, _) = context
1232 .open(&cfg.partition, &blob_name)
1233 .await
1234 .expect("Failed to create blob");
1235
1236 let item_data = b"Test data";
1238 let item_size = item_data.len() as u32;
1239
1240 let mut buf = Vec::new();
1242 UInt(item_size).write(&mut buf);
1243 BufMut::put_slice(&mut buf, item_data);
1244 blob.write_at(0, buf)
1245 .await
1246 .expect("Failed to write item without checksum");
1247
1248 blob.sync().await.expect("Failed to sync blob");
1249
1250 let journal = Journal::init(context, cfg)
1252 .await
1253 .expect("Failed to initialize journal");
1254
1255 let stream = journal
1259 .replay(0, 0, NZUsize!(1024))
1260 .await
1261 .expect("unable to setup replay");
1262 pin_mut!(stream);
1263 let mut items = Vec::<(u64, u64)>::new();
1264 while let Some(result) = stream.next().await {
1265 match result {
1266 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1267 Err(err) => panic!("Failed to read item: {err}"),
1268 }
1269 }
1270 assert!(items.is_empty());
1271 });
1272 }
1273
1274 #[test_traced]
1275 fn test_journal_read_checksum_mismatch() {
1276 let executor = deterministic::Runner::default();
1278
1279 executor.start(|context| async move {
1281 let cfg = Config {
1283 partition: "test-partition".into(),
1284 compression: None,
1285 codec_config: (),
1286 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1287 write_buffer: NZUsize!(1024),
1288 };
1289
1290 let section = 1u64;
1292 let blob_name = section.to_be_bytes();
1293 let (blob, _) = context
1294 .open(&cfg.partition, &blob_name)
1295 .await
1296 .expect("Failed to create blob");
1297
1298 let item_data = b"Test data";
1300 let item_size = item_data.len() as u32;
1301 let incorrect_checksum: u32 = 0xDEADBEEF;
1302
1303 let mut buf = Vec::new();
1305 UInt(item_size).write(&mut buf);
1306 BufMut::put_slice(&mut buf, item_data);
1307 buf.put_u32(incorrect_checksum);
1308 blob.write_at(0, buf)
1309 .await
1310 .expect("Failed to write item with bad checksum");
1311
1312 blob.sync().await.expect("Failed to sync blob");
1313
1314 let journal = Journal::init(context.clone(), cfg.clone())
1316 .await
1317 .expect("Failed to initialize journal");
1318
1319 {
1321 let stream = journal
1322 .replay(0, 0, NZUsize!(1024))
1323 .await
1324 .expect("unable to setup replay");
1325 pin_mut!(stream);
1326 let mut items = Vec::<(u64, u64)>::new();
1327 while let Some(result) = stream.next().await {
1328 match result {
1329 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1330 Err(err) => panic!("Failed to read item: {err}"),
1331 }
1332 }
1333 assert!(items.is_empty());
1334 }
1335 drop(journal);
1336
1337 let (_, blob_size) = context
1339 .open(&cfg.partition, §ion.to_be_bytes())
1340 .await
1341 .expect("Failed to open blob");
1342 assert_eq!(blob_size, 0);
1343 });
1344 }
1345
1346 #[test_traced]
1347 fn test_journal_truncation_recovery() {
1348 let executor = deterministic::Runner::default();
1350
1351 executor.start(|context| async move {
1353 let cfg = Config {
1355 partition: "test-partition".into(),
1356 compression: None,
1357 codec_config: (),
1358 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1359 write_buffer: NZUsize!(1024),
1360 };
1361
1362 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1364 .await
1365 .expect("Failed to initialize journal");
1366
1367 journal.append(1, &1).await.expect("Failed to append data");
1369
1370 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1372 for (index, data) in &data_items {
1373 journal
1374 .append(*index, data)
1375 .await
1376 .expect("Failed to append data");
1377 journal.sync(*index).await.expect("Failed to sync blob");
1378 }
1379
1380 journal.sync_all().await.expect("Failed to sync");
1382 drop(journal);
1383
1384 let (blob, blob_size) = context
1386 .open(&cfg.partition, &2u64.to_be_bytes())
1387 .await
1388 .expect("Failed to open blob");
1389 blob.resize(blob_size - 4)
1390 .await
1391 .expect("Failed to corrupt blob");
1392 blob.sync().await.expect("Failed to sync blob");
1393
1394 let journal = Journal::init(context.with_label("second"), cfg.clone())
1396 .await
1397 .expect("Failed to re-initialize journal");
1398
1399 let mut items = Vec::<(u64, u32)>::new();
1401 {
1402 let stream = journal
1403 .replay(0, 0, NZUsize!(1024))
1404 .await
1405 .expect("unable to setup replay");
1406 pin_mut!(stream);
1407 while let Some(result) = stream.next().await {
1408 match result {
1409 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1410 Err(err) => panic!("Failed to read item: {err}"),
1411 }
1412 }
1413 }
1414 drop(journal);
1415
1416 assert_eq!(items.len(), 1);
1418 assert_eq!(items[0].0, 1);
1419 assert_eq!(items[0].1, 1);
1420
1421 let (_, blob_size) = context
1423 .open(&cfg.partition, &2u64.to_be_bytes())
1424 .await
1425 .expect("Failed to open blob");
1426 assert_eq!(blob_size, 0);
1427
1428 let mut journal = Journal::init(context.with_label("third"), cfg.clone())
1430 .await
1431 .expect("Failed to re-initialize journal");
1432
1433 let mut items = Vec::<(u64, u32)>::new();
1435 {
1436 let stream = journal
1437 .replay(0, 0, NZUsize!(1024))
1438 .await
1439 .expect("unable to setup replay");
1440 pin_mut!(stream);
1441 while let Some(result) = stream.next().await {
1442 match result {
1443 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1444 Err(err) => panic!("Failed to read item: {err}"),
1445 }
1446 }
1447 }
1448
1449 assert_eq!(items.len(), 1);
1451 assert_eq!(items[0].0, 1);
1452 assert_eq!(items[0].1, 1);
1453
1454 let (_offset, _) = journal.append(2, &5).await.expect("Failed to append data");
1456 journal.sync(2).await.expect("Failed to sync blob");
1457
1458 let item = journal.get(2, 0).await.expect("Failed to get item");
1460 assert_eq!(item, 5);
1461
1462 drop(journal);
1464
1465 let journal = Journal::init(context.clone(), cfg.clone())
1467 .await
1468 .expect("Failed to re-initialize journal");
1469
1470 let mut items = Vec::<(u64, u32)>::new();
1472 {
1473 let stream = journal
1474 .replay(0, 0, NZUsize!(1024))
1475 .await
1476 .expect("unable to setup replay");
1477 pin_mut!(stream);
1478 while let Some(result) = stream.next().await {
1479 match result {
1480 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1481 Err(err) => panic!("Failed to read item: {err}"),
1482 }
1483 }
1484 }
1485
1486 assert_eq!(items.len(), 2);
1488 assert_eq!(items[0].0, 1);
1489 assert_eq!(items[0].1, 1);
1490 assert_eq!(items[1].0, 2);
1491 assert_eq!(items[1].1, 5);
1492 });
1493 }
1494
1495 #[test_traced]
1496 fn test_journal_handling_extra_data() {
1497 let executor = deterministic::Runner::default();
1499
1500 executor.start(|context| async move {
1502 let cfg = Config {
1504 partition: "test-partition".into(),
1505 compression: None,
1506 codec_config: (),
1507 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1508 write_buffer: NZUsize!(1024),
1509 };
1510
1511 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1513 .await
1514 .expect("Failed to initialize journal");
1515
1516 journal.append(1, &1).await.expect("Failed to append data");
1518
1519 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1521 for (index, data) in &data_items {
1522 journal
1523 .append(*index, data)
1524 .await
1525 .expect("Failed to append data");
1526 journal.sync(*index).await.expect("Failed to sync blob");
1527 }
1528
1529 journal.sync_all().await.expect("Failed to sync");
1531 drop(journal);
1532
1533 let (blob, blob_size) = context
1535 .open(&cfg.partition, &2u64.to_be_bytes())
1536 .await
1537 .expect("Failed to open blob");
1538 blob.write_at(blob_size, vec![0u8; 16])
1539 .await
1540 .expect("Failed to add extra data");
1541 blob.sync().await.expect("Failed to sync blob");
1542
1543 let journal = Journal::init(context.with_label("second"), cfg)
1545 .await
1546 .expect("Failed to re-initialize journal");
1547
1548 let mut items = Vec::<(u64, i32)>::new();
1550 let stream = journal
1551 .replay(0, 0, NZUsize!(1024))
1552 .await
1553 .expect("unable to setup replay");
1554 pin_mut!(stream);
1555 while let Some(result) = stream.next().await {
1556 match result {
1557 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1558 Err(err) => panic!("Failed to read item: {err}"),
1559 }
1560 }
1561 });
1562 }
1563
1564 #[test_traced]
1565 fn test_journal_rewind() {
1566 let executor = deterministic::Runner::default();
1568 executor.start(|context| async move {
1569 let cfg = Config {
1571 partition: "test-partition".into(),
1572 compression: None,
1573 codec_config: (),
1574 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1575 write_buffer: NZUsize!(1024),
1576 };
1577 let mut journal = Journal::init(context, cfg).await.unwrap();
1578
1579 let size = journal.size(1).await.unwrap();
1581 assert_eq!(size, 0);
1582
1583 journal.append(1, &42i32).await.unwrap();
1585
1586 let size = journal.size(1).await.unwrap();
1588 assert!(size > 0);
1589
1590 journal.append(1, &43i32).await.unwrap();
1592 let new_size = journal.size(1).await.unwrap();
1593 assert!(new_size > size);
1594
1595 let size = journal.size(2).await.unwrap();
1597 assert_eq!(size, 0);
1598
1599 journal.append(2, &44i32).await.unwrap();
1601
1602 let size = journal.size(2).await.unwrap();
1604 assert!(size > 0);
1605
1606 journal.rewind(1, 0).await.unwrap();
1608
1609 let size = journal.size(1).await.unwrap();
1611 assert_eq!(size, 0);
1612
1613 let size = journal.size(2).await.unwrap();
1615 assert_eq!(size, 0);
1616 });
1617 }
1618
1619 #[test_traced]
1620 fn test_journal_rewind_section() {
1621 let executor = deterministic::Runner::default();
1623 executor.start(|context| async move {
1624 let cfg = Config {
1626 partition: "test-partition".into(),
1627 compression: None,
1628 codec_config: (),
1629 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1630 write_buffer: NZUsize!(1024),
1631 };
1632 let mut journal = Journal::init(context, cfg).await.unwrap();
1633
1634 let size = journal.size(1).await.unwrap();
1636 assert_eq!(size, 0);
1637
1638 journal.append(1, &42i32).await.unwrap();
1640
1641 let size = journal.size(1).await.unwrap();
1643 assert!(size > 0);
1644
1645 journal.append(1, &43i32).await.unwrap();
1647 let new_size = journal.size(1).await.unwrap();
1648 assert!(new_size > size);
1649
1650 let size = journal.size(2).await.unwrap();
1652 assert_eq!(size, 0);
1653
1654 journal.append(2, &44i32).await.unwrap();
1656
1657 let size = journal.size(2).await.unwrap();
1659 assert!(size > 0);
1660
1661 journal.rewind_section(1, 0).await.unwrap();
1663
1664 let size = journal.size(1).await.unwrap();
1666 assert_eq!(size, 0);
1667
1668 let size = journal.size(2).await.unwrap();
1670 assert!(size > 0);
1671 });
1672 }
1673
1674 #[test_traced]
1675 fn test_journal_small_items() {
1676 let executor = deterministic::Runner::default();
1677 executor.start(|context| async move {
1678 let cfg = Config {
1679 partition: "test-partition".into(),
1680 compression: None,
1681 codec_config: (),
1682 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1683 write_buffer: NZUsize!(1024),
1684 };
1685
1686 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1687 .await
1688 .expect("Failed to initialize journal");
1689
1690 let num_items = 100;
1692 let mut offsets = Vec::new();
1693 for i in 0..num_items {
1694 let (offset, size) = journal
1695 .append(1, &(i as u8))
1696 .await
1697 .expect("Failed to append data");
1698 assert_eq!(size, 1, "u8 should encode to 1 byte");
1699 offsets.push(offset);
1700 }
1701 journal.sync(1).await.expect("Failed to sync");
1702
1703 for (i, &offset) in offsets.iter().enumerate() {
1705 let item: u8 = journal.get(1, offset).await.expect("Failed to get item");
1706 assert_eq!(item, i as u8, "Item mismatch at offset {offset}");
1707 }
1708
1709 drop(journal);
1711 let journal = Journal::<_, u8>::init(context.with_label("second"), cfg)
1712 .await
1713 .expect("Failed to re-initialize journal");
1714
1715 let stream = journal
1717 .replay(0, 0, NZUsize!(1024))
1718 .await
1719 .expect("Failed to setup replay");
1720 pin_mut!(stream);
1721
1722 let mut count = 0;
1723 while let Some(result) = stream.next().await {
1724 let (section, offset, size, item) = result.expect("Failed to replay item");
1725 assert_eq!(section, 1);
1726 assert_eq!(offset, offsets[count]);
1727 assert_eq!(size, 1);
1728 assert_eq!(item, count as u8);
1729 count += 1;
1730 }
1731 assert_eq!(count, num_items, "Should replay all items");
1732 });
1733 }
1734
1735 #[test_traced]
1736 fn test_journal_rewind_many_sections() {
1737 let executor = deterministic::Runner::default();
1738 executor.start(|context| async move {
1739 let cfg = Config {
1740 partition: "test-partition".into(),
1741 compression: None,
1742 codec_config: (),
1743 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1744 write_buffer: NZUsize!(1024),
1745 };
1746 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1747
1748 for section in 1u64..=10 {
1750 journal.append(section, &(section as i32)).await.unwrap();
1751 }
1752 journal.sync_all().await.unwrap();
1753
1754 for section in 1u64..=10 {
1756 let size = journal.size(section).await.unwrap();
1757 assert!(size > 0, "section {section} should have data");
1758 }
1759
1760 journal
1762 .rewind(5, journal.size(5).await.unwrap())
1763 .await
1764 .unwrap();
1765
1766 for section in 1u64..=5 {
1768 let size = journal.size(section).await.unwrap();
1769 assert!(size > 0, "section {section} should still have data");
1770 }
1771
1772 for section in 6u64..=10 {
1774 let size = journal.size(section).await.unwrap();
1775 assert_eq!(size, 0, "section {section} should be removed");
1776 }
1777
1778 {
1780 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1781 pin_mut!(stream);
1782 let mut items = Vec::new();
1783 while let Some(result) = stream.next().await {
1784 let (section, _, _, item) = result.unwrap();
1785 items.push((section, item));
1786 }
1787 assert_eq!(items.len(), 5);
1788 for (i, (section, item)) in items.iter().enumerate() {
1789 assert_eq!(*section, (i + 1) as u64);
1790 assert_eq!(*item, (i + 1) as i32);
1791 }
1792 }
1793
1794 journal.destroy().await.unwrap();
1795 });
1796 }
1797
1798 #[test_traced]
1799 fn test_journal_rewind_partial_truncation() {
1800 let executor = deterministic::Runner::default();
1801 executor.start(|context| async move {
1802 let cfg = Config {
1803 partition: "test-partition".into(),
1804 compression: None,
1805 codec_config: (),
1806 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1807 write_buffer: NZUsize!(1024),
1808 };
1809 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1810
1811 let mut sizes = Vec::new();
1813 for i in 0..5 {
1814 journal.append(1, &i).await.unwrap();
1815 journal.sync(1).await.unwrap();
1816 sizes.push(journal.size(1).await.unwrap());
1817 }
1818
1819 let target_size = sizes[2];
1821 journal.rewind(1, target_size).await.unwrap();
1822
1823 let new_size = journal.size(1).await.unwrap();
1825 assert_eq!(new_size, target_size);
1826
1827 {
1829 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1830 pin_mut!(stream);
1831 let mut items = Vec::new();
1832 while let Some(result) = stream.next().await {
1833 let (_, _, _, item) = result.unwrap();
1834 items.push(item);
1835 }
1836 assert_eq!(items.len(), 3);
1837 for (i, item) in items.iter().enumerate() {
1838 assert_eq!(*item, i as i32);
1839 }
1840 }
1841
1842 journal.destroy().await.unwrap();
1843 });
1844 }
1845
1846 #[test_traced]
1847 fn test_journal_rewind_nonexistent_target() {
1848 let executor = deterministic::Runner::default();
1849 executor.start(|context| async move {
1850 let cfg = Config {
1851 partition: "test-partition".into(),
1852 compression: None,
1853 codec_config: (),
1854 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1855 write_buffer: NZUsize!(1024),
1856 };
1857 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1858
1859 for section in 5u64..=7 {
1861 journal.append(section, &(section as i32)).await.unwrap();
1862 }
1863 journal.sync_all().await.unwrap();
1864
1865 journal.rewind(3, 0).await.unwrap();
1867
1868 for section in 5u64..=7 {
1870 let size = journal.size(section).await.unwrap();
1871 assert_eq!(size, 0, "section {section} should be removed");
1872 }
1873
1874 {
1876 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1877 pin_mut!(stream);
1878 let items: Vec<_> = stream.collect().await;
1879 assert!(items.is_empty());
1880 }
1881
1882 journal.destroy().await.unwrap();
1883 });
1884 }
1885
1886 #[test_traced]
1887 fn test_journal_rewind_persistence() {
1888 let executor = deterministic::Runner::default();
1889 executor.start(|context| async move {
1890 let cfg = Config {
1891 partition: "test-partition".into(),
1892 compression: None,
1893 codec_config: (),
1894 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1895 write_buffer: NZUsize!(1024),
1896 };
1897
1898 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
1900 .await
1901 .unwrap();
1902 for section in 1u64..=5 {
1903 journal.append(section, &(section as i32)).await.unwrap();
1904 }
1905 journal.sync_all().await.unwrap();
1906
1907 let size = journal.size(2).await.unwrap();
1909 journal.rewind(2, size).await.unwrap();
1910 journal.sync_all().await.unwrap();
1911 drop(journal);
1912
1913 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
1915 .await
1916 .unwrap();
1917
1918 for section in 1u64..=2 {
1920 let size = journal.size(section).await.unwrap();
1921 assert!(size > 0, "section {section} should have data after restart");
1922 }
1923
1924 for section in 3u64..=5 {
1926 let size = journal.size(section).await.unwrap();
1927 assert_eq!(size, 0, "section {section} should be gone after restart");
1928 }
1929
1930 {
1932 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1933 pin_mut!(stream);
1934 let mut items = Vec::new();
1935 while let Some(result) = stream.next().await {
1936 let (section, _, _, item) = result.unwrap();
1937 items.push((section, item));
1938 }
1939 assert_eq!(items.len(), 2);
1940 assert_eq!(items[0], (1, 1));
1941 assert_eq!(items[1], (2, 2));
1942 }
1943
1944 journal.destroy().await.unwrap();
1945 });
1946 }
1947
1948 #[test_traced]
1949 fn test_journal_rewind_to_zero_removes_all_newer() {
1950 let executor = deterministic::Runner::default();
1951 executor.start(|context| async move {
1952 let cfg = Config {
1953 partition: "test-partition".into(),
1954 compression: None,
1955 codec_config: (),
1956 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1957 write_buffer: NZUsize!(1024),
1958 };
1959 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1960
1961 for section in 1u64..=3 {
1963 journal.append(section, &(section as i32)).await.unwrap();
1964 }
1965 journal.sync_all().await.unwrap();
1966
1967 journal.rewind(1, 0).await.unwrap();
1969
1970 let size = journal.size(1).await.unwrap();
1972 assert_eq!(size, 0, "section 1 should be empty");
1973
1974 for section in 2u64..=3 {
1976 let size = journal.size(section).await.unwrap();
1977 assert_eq!(size, 0, "section {section} should be removed");
1978 }
1979
1980 {
1982 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1983 pin_mut!(stream);
1984 let items: Vec<_> = stream.collect().await;
1985 assert!(items.is_empty());
1986 }
1987
1988 journal.destroy().await.unwrap();
1989 });
1990 }
1991
1992 #[test_traced]
1993 fn test_journal_replay_start_offset_with_trailing_bytes() {
1994 let executor = deterministic::Runner::default();
1996 executor.start(|context| async move {
1997 let cfg = Config {
1998 partition: "test-partition".into(),
1999 compression: None,
2000 codec_config: (),
2001 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2002 write_buffer: NZUsize!(1024),
2003 };
2004 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2005 .await
2006 .expect("Failed to initialize journal");
2007
2008 for i in 0..5i32 {
2010 journal.append(1, &i).await.unwrap();
2011 }
2012 journal.sync(1).await.unwrap();
2013 let valid_logical_size = journal.size(1).await.unwrap();
2014 drop(journal);
2015
2016 let (blob, physical_size_before) = context
2018 .open(&cfg.partition, &1u64.to_be_bytes())
2019 .await
2020 .unwrap();
2021
2022 blob.write_at(physical_size_before, vec![0xFF, 0xFF])
2025 .await
2026 .unwrap();
2027 blob.sync().await.unwrap();
2028
2029 let start_offset = valid_logical_size;
2033 {
2034 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2035 .await
2036 .unwrap();
2037
2038 let stream = journal
2039 .replay(1, start_offset, NZUsize!(1024))
2040 .await
2041 .unwrap();
2042 pin_mut!(stream);
2043
2044 while let Some(_result) = stream.next().await {}
2046 }
2047
2048 let (_, physical_size_after) = context
2050 .open(&cfg.partition, &1u64.to_be_bytes())
2051 .await
2052 .unwrap();
2053
2054 assert!(
2057 physical_size_after >= physical_size_before,
2058 "Valid data was lost! Physical blob truncated from {physical_size_before} to \
2059 {physical_size_after}. Logical valid size was {valid_logical_size}. \
2060 This indicates valid_offset was incorrectly initialized to 0 instead of start_offset."
2061 );
2062 });
2063 }
2064
2065 #[test_traced]
2066 fn test_journal_large_item_spanning_pages() {
2067 const LARGE_SIZE: usize = 2048;
2069 type LargeItem = [u8; LARGE_SIZE];
2070
2071 let executor = deterministic::Runner::default();
2072 executor.start(|context| async move {
2073 let cfg = Config {
2074 partition: "test-partition".into(),
2075 compression: None,
2076 codec_config: (),
2077 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2078 write_buffer: NZUsize!(4096),
2079 };
2080 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2081 .await
2082 .expect("Failed to initialize journal");
2083
2084 let mut large_data: LargeItem = [0u8; LARGE_SIZE];
2086 for (i, byte) in large_data.iter_mut().enumerate() {
2087 *byte = (i % 256) as u8;
2088 }
2089 assert!(
2090 LARGE_SIZE > PAGE_SIZE.get() as usize,
2091 "Item must be larger than page size"
2092 );
2093
2094 let (offset, size) = journal
2096 .append(1, &large_data)
2097 .await
2098 .expect("Failed to append large item");
2099 assert_eq!(size as usize, LARGE_SIZE);
2100 journal.sync(1).await.expect("Failed to sync");
2101
2102 let retrieved: LargeItem = journal
2104 .get(1, offset)
2105 .await
2106 .expect("Failed to get large item");
2107 assert_eq!(retrieved, large_data, "Random access read mismatch");
2108
2109 drop(journal);
2111 let journal = Journal::<_, LargeItem>::init(context.with_label("second"), cfg.clone())
2112 .await
2113 .expect("Failed to re-initialize journal");
2114
2115 {
2117 let stream = journal
2118 .replay(0, 0, NZUsize!(1024))
2119 .await
2120 .expect("Failed to setup replay");
2121 pin_mut!(stream);
2122
2123 let mut items = Vec::new();
2124 while let Some(result) = stream.next().await {
2125 let (section, off, sz, item) = result.expect("Failed to replay item");
2126 items.push((section, off, sz, item));
2127 }
2128
2129 assert_eq!(items.len(), 1, "Should have exactly one item");
2130 let (section, off, sz, item) = &items[0];
2131 assert_eq!(*section, 1);
2132 assert_eq!(*off, offset);
2133 assert_eq!(*sz as usize, LARGE_SIZE);
2134 assert_eq!(*item, large_data, "Replay read mismatch");
2135 }
2136
2137 journal.destroy().await.unwrap();
2138 });
2139 }
2140
2141 #[test_traced]
2142 fn test_journal_non_contiguous_sections() {
2143 let executor = deterministic::Runner::default();
2146 executor.start(|context| async move {
2147 let cfg = Config {
2148 partition: "test-partition".into(),
2149 compression: None,
2150 codec_config: (),
2151 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2152 write_buffer: NZUsize!(1024),
2153 };
2154 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2155 .await
2156 .expect("Failed to initialize journal");
2157
2158 let sections_and_data = [(1u64, 100i32), (5u64, 500i32), (10u64, 1000i32)];
2160 let mut offsets = Vec::new();
2161
2162 for (section, data) in §ions_and_data {
2163 let (offset, _) = journal
2164 .append(*section, data)
2165 .await
2166 .expect("Failed to append");
2167 offsets.push(offset);
2168 }
2169 journal.sync_all().await.expect("Failed to sync");
2170
2171 for (i, (section, expected_data)) in sections_and_data.iter().enumerate() {
2173 let retrieved: i32 = journal
2174 .get(*section, offsets[i])
2175 .await
2176 .expect("Failed to get item");
2177 assert_eq!(retrieved, *expected_data);
2178 }
2179
2180 for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
2182 let result = journal.get(missing_section, 0).await;
2183 assert!(
2184 matches!(result, Err(Error::SectionOutOfRange(_))),
2185 "Expected SectionOutOfRange for section {}, got {:?}",
2186 missing_section,
2187 result
2188 );
2189 }
2190
2191 drop(journal);
2193 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2194 .await
2195 .expect("Failed to re-initialize journal");
2196
2197 {
2199 let stream = journal
2200 .replay(0, 0, NZUsize!(1024))
2201 .await
2202 .expect("Failed to setup replay");
2203 pin_mut!(stream);
2204
2205 let mut items = Vec::new();
2206 while let Some(result) = stream.next().await {
2207 let (section, _, _, item) = result.expect("Failed to replay item");
2208 items.push((section, item));
2209 }
2210
2211 assert_eq!(items.len(), 3, "Should have 3 items");
2212 assert_eq!(items[0], (1, 100));
2213 assert_eq!(items[1], (5, 500));
2214 assert_eq!(items[2], (10, 1000));
2215 }
2216
2217 {
2219 let stream = journal
2220 .replay(5, 0, NZUsize!(1024))
2221 .await
2222 .expect("Failed to setup replay from section 5");
2223 pin_mut!(stream);
2224
2225 let mut items = Vec::new();
2226 while let Some(result) = stream.next().await {
2227 let (section, _, _, item) = result.expect("Failed to replay item");
2228 items.push((section, item));
2229 }
2230
2231 assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
2232 assert_eq!(items[0], (5, 500));
2233 assert_eq!(items[1], (10, 1000));
2234 }
2235
2236 {
2238 let stream = journal
2239 .replay(3, 0, NZUsize!(1024))
2240 .await
2241 .expect("Failed to setup replay from section 3");
2242 pin_mut!(stream);
2243
2244 let mut items = Vec::new();
2245 while let Some(result) = stream.next().await {
2246 let (section, _, _, item) = result.expect("Failed to replay item");
2247 items.push((section, item));
2248 }
2249
2250 assert_eq!(items.len(), 2);
2252 assert_eq!(items[0], (5, 500));
2253 assert_eq!(items[1], (10, 1000));
2254 }
2255
2256 journal.destroy().await.unwrap();
2257 });
2258 }
2259
2260 #[test_traced]
2261 fn test_journal_empty_section_in_middle() {
2262 let executor = deterministic::Runner::default();
2265 executor.start(|context| async move {
2266 let cfg = Config {
2267 partition: "test-partition".into(),
2268 compression: None,
2269 codec_config: (),
2270 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2271 write_buffer: NZUsize!(1024),
2272 };
2273 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2274 .await
2275 .expect("Failed to initialize journal");
2276
2277 journal.append(1, &100i32).await.expect("Failed to append");
2279
2280 journal.append(2, &200i32).await.expect("Failed to append");
2283 journal.sync(2).await.expect("Failed to sync");
2284 journal
2285 .rewind_section(2, 0)
2286 .await
2287 .expect("Failed to rewind");
2288
2289 journal.append(3, &300i32).await.expect("Failed to append");
2291
2292 journal.sync_all().await.expect("Failed to sync");
2293
2294 assert!(journal.size(1).await.unwrap() > 0);
2296 assert_eq!(journal.size(2).await.unwrap(), 0);
2297 assert!(journal.size(3).await.unwrap() > 0);
2298
2299 drop(journal);
2301 let journal = Journal::<_, i32>::init(context.with_label("second"), cfg.clone())
2302 .await
2303 .expect("Failed to re-initialize journal");
2304
2305 {
2307 let stream = journal
2308 .replay(0, 0, NZUsize!(1024))
2309 .await
2310 .expect("Failed to setup replay");
2311 pin_mut!(stream);
2312
2313 let mut items = Vec::new();
2314 while let Some(result) = stream.next().await {
2315 let (section, _, _, item) = result.expect("Failed to replay item");
2316 items.push((section, item));
2317 }
2318
2319 assert_eq!(
2320 items.len(),
2321 2,
2322 "Should have 2 items (skipping empty section)"
2323 );
2324 assert_eq!(items[0], (1, 100));
2325 assert_eq!(items[1], (3, 300));
2326 }
2327
2328 {
2330 let stream = journal
2331 .replay(2, 0, NZUsize!(1024))
2332 .await
2333 .expect("Failed to setup replay from section 2");
2334 pin_mut!(stream);
2335
2336 let mut items = Vec::new();
2337 while let Some(result) = stream.next().await {
2338 let (section, _, _, item) = result.expect("Failed to replay item");
2339 items.push((section, item));
2340 }
2341
2342 assert_eq!(items.len(), 1, "Should have 1 item from section 3");
2343 assert_eq!(items[0], (3, 300));
2344 }
2345
2346 journal.destroy().await.unwrap();
2347 });
2348 }
2349
2350 #[test_traced]
2351 fn test_journal_item_exactly_page_size() {
2352 const ITEM_SIZE: usize = PAGE_SIZE.get() as usize;
2355 type ExactItem = [u8; ITEM_SIZE];
2356
2357 let executor = deterministic::Runner::default();
2358 executor.start(|context| async move {
2359 let cfg = Config {
2360 partition: "test-partition".into(),
2361 compression: None,
2362 codec_config: (),
2363 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2364 write_buffer: NZUsize!(4096),
2365 };
2366 let mut journal = Journal::init(context.with_label("first"), cfg.clone())
2367 .await
2368 .expect("Failed to initialize journal");
2369
2370 let mut exact_data: ExactItem = [0u8; ITEM_SIZE];
2372 for (i, byte) in exact_data.iter_mut().enumerate() {
2373 *byte = (i % 256) as u8;
2374 }
2375
2376 let (offset, size) = journal
2378 .append(1, &exact_data)
2379 .await
2380 .expect("Failed to append exact item");
2381 assert_eq!(size as usize, ITEM_SIZE);
2382 journal.sync(1).await.expect("Failed to sync");
2383
2384 let retrieved: ExactItem = journal
2386 .get(1, offset)
2387 .await
2388 .expect("Failed to get exact item");
2389 assert_eq!(retrieved, exact_data, "Random access read mismatch");
2390
2391 drop(journal);
2393 let journal = Journal::<_, ExactItem>::init(context.with_label("second"), cfg.clone())
2394 .await
2395 .expect("Failed to re-initialize journal");
2396
2397 {
2399 let stream = journal
2400 .replay(0, 0, NZUsize!(1024))
2401 .await
2402 .expect("Failed to setup replay");
2403 pin_mut!(stream);
2404
2405 let mut items = Vec::new();
2406 while let Some(result) = stream.next().await {
2407 let (section, off, sz, item) = result.expect("Failed to replay item");
2408 items.push((section, off, sz, item));
2409 }
2410
2411 assert_eq!(items.len(), 1, "Should have exactly one item");
2412 let (section, off, sz, item) = &items[0];
2413 assert_eq!(*section, 1);
2414 assert_eq!(*off, offset);
2415 assert_eq!(*sz as usize, ITEM_SIZE);
2416 assert_eq!(*item, exact_data, "Replay read mismatch");
2417 }
2418
2419 journal.destroy().await.unwrap();
2420 });
2421 }
2422
2423 #[test_traced]
2424 fn test_journal_varint_spanning_page_boundary() {
2425 const SMALL_PAGE: NonZeroU16 = NZU16!(16);
2433
2434 let executor = deterministic::Runner::default();
2435 executor.start(|context| async move {
2436 let cfg = Config {
2437 partition: "test-partition".into(),
2438 compression: None,
2439 codec_config: (),
2440 page_cache: CacheRef::from_pooler(&context, SMALL_PAGE, PAGE_CACHE_SIZE),
2441 write_buffer: NZUsize!(1024),
2442 };
2443 let mut journal: Journal<_, [u8; 128]> =
2444 Journal::init(context.with_label("first"), cfg.clone())
2445 .await
2446 .expect("Failed to initialize journal");
2447
2448 let item1: [u8; 128] = [1u8; 128];
2450 let item2: [u8; 128] = [2u8; 128];
2451 let item3: [u8; 128] = [3u8; 128];
2452
2453 let (offset1, _) = journal.append(1, &item1).await.expect("Failed to append");
2456 let (offset2, _) = journal.append(1, &item2).await.expect("Failed to append");
2457 let (offset3, _) = journal.append(1, &item3).await.expect("Failed to append");
2458
2459 journal.sync(1).await.expect("Failed to sync");
2460
2461 let retrieved1: [u8; 128] = journal.get(1, offset1).await.expect("Failed to get");
2463 let retrieved2: [u8; 128] = journal.get(1, offset2).await.expect("Failed to get");
2464 let retrieved3: [u8; 128] = journal.get(1, offset3).await.expect("Failed to get");
2465 assert_eq!(retrieved1, item1);
2466 assert_eq!(retrieved2, item2);
2467 assert_eq!(retrieved3, item3);
2468
2469 drop(journal);
2471 let journal: Journal<_, [u8; 128]> =
2472 Journal::init(context.with_label("second"), cfg.clone())
2473 .await
2474 .expect("Failed to re-initialize journal");
2475
2476 {
2478 let stream = journal
2479 .replay(0, 0, NZUsize!(64))
2480 .await
2481 .expect("Failed to setup replay");
2482 pin_mut!(stream);
2483
2484 let mut items = Vec::new();
2485 while let Some(result) = stream.next().await {
2486 let (section, off, _, item) = result.expect("Failed to replay item");
2487 items.push((section, off, item));
2488 }
2489
2490 assert_eq!(items.len(), 3, "Should have 3 items");
2491 assert_eq!(items[0], (1, offset1, item1));
2492 assert_eq!(items[1], (1, offset2, item2));
2493 assert_eq!(items[2], (1, offset3, item3));
2494 }
2495
2496 journal.destroy().await.unwrap();
2497 });
2498 }
2499
2500 #[test_traced]
2501 fn test_journal_clear() {
2502 let executor = deterministic::Runner::default();
2503 executor.start(|context| async move {
2504 let cfg = Config {
2505 partition: "clear-test".into(),
2506 compression: None,
2507 codec_config: (),
2508 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2509 write_buffer: NZUsize!(1024),
2510 };
2511
2512 let mut journal: Journal<_, u64> =
2513 Journal::init(context.with_label("journal"), cfg.clone())
2514 .await
2515 .expect("Failed to initialize journal");
2516
2517 for section in 0..5u64 {
2519 for i in 0..10u64 {
2520 journal
2521 .append(section, &(section * 1000 + i))
2522 .await
2523 .expect("Failed to append");
2524 }
2525 journal.sync(section).await.expect("Failed to sync");
2526 }
2527
2528 assert_eq!(journal.get(0, 0).await.unwrap(), 0);
2530 assert_eq!(journal.get(4, 0).await.unwrap(), 4000);
2531
2532 journal.clear().await.expect("Failed to clear");
2534
2535 for section in 0..5u64 {
2537 assert!(matches!(
2538 journal.get(section, 0).await,
2539 Err(Error::SectionOutOfRange(s)) if s == section
2540 ));
2541 }
2542
2543 for i in 0..5u64 {
2545 journal
2546 .append(10, &(i * 100))
2547 .await
2548 .expect("Failed to append after clear");
2549 }
2550 journal.sync(10).await.expect("Failed to sync after clear");
2551
2552 assert_eq!(journal.get(10, 0).await.unwrap(), 0);
2554
2555 assert!(matches!(
2557 journal.get(0, 0).await,
2558 Err(Error::SectionOutOfRange(0))
2559 ));
2560
2561 journal.destroy().await.unwrap();
2562 });
2563 }
2564}