1use super::manager::{AppendFactory, Config as ManagerConfig, Manager};
81use crate::journal::Error;
82use bytes::{Buf, BufMut, Bytes};
83use commonware_codec::{
84 varint::UInt, Codec, CodecShared, EncodeSize, ReadExt, Write as CodecWrite,
85};
86use commonware_runtime::{
87 buffer::pool::{Append, PoolRef, Replay},
88 Blob, Metrics, Storage,
89};
90use futures::stream::{self, Stream, StreamExt};
91use std::{io::Cursor, num::NonZeroUsize};
92use tracing::{trace, warn};
93use zstd::{bulk::compress, decode_all};
94
95const MAX_VARINT_SIZE: usize = 5;
97
98#[derive(Clone)]
100pub struct Config<C> {
101 pub partition: String,
104
105 pub compression: Option<u8>,
107
108 pub codec_config: C,
110
111 pub buffer_pool: PoolRef,
113
114 pub write_buffer: NonZeroUsize,
116}
117
118#[inline]
121fn decode_length_prefix(buf: &mut impl Buf) -> Result<(usize, usize), Error> {
122 let initial = buf.remaining();
123 let size = UInt::<u32>::read(buf)?.0 as usize;
124 let varint_len = initial - buf.remaining();
125 Ok((size, varint_len))
126}
127
128enum ItemInfo {
130 Complete {
132 varint_len: usize,
134 data_len: usize,
136 },
137 Incomplete {
139 varint_len: usize,
141 prefix_len: usize,
143 total_len: usize,
145 },
146}
147
148fn find_item(buf: &mut impl Buf, offset: u64) -> Result<(u64, ItemInfo), Error> {
152 let available = buf.remaining();
153 let (size, varint_len) = decode_length_prefix(buf)?;
154 let next_offset = offset
155 .checked_add(varint_len as u64)
156 .ok_or(Error::OffsetOverflow)?
157 .checked_add(size as u64)
158 .ok_or(Error::OffsetOverflow)?;
159 let buffered = available.saturating_sub(varint_len);
160
161 let item = if buffered >= size {
162 ItemInfo::Complete {
163 varint_len,
164 data_len: size,
165 }
166 } else {
167 ItemInfo::Incomplete {
168 varint_len,
169 prefix_len: buffered,
170 total_len: size,
171 }
172 };
173
174 Ok((next_offset, item))
175}
176
177struct ReplayState<B: Blob, C> {
179 section: u64,
180 blob: Append<B>,
181 replay: Replay<B>,
182 skip_bytes: u64,
183 offset: u64,
184 valid_offset: u64,
185 codec_config: C,
186 compressed: bool,
187 done: bool,
188}
189
190fn decode_item<V: Codec>(item_data: impl Buf, cfg: &V::Cfg, compressed: bool) -> Result<V, Error> {
192 if compressed {
193 let decompressed =
194 decode_all(item_data.reader()).map_err(|_| Error::DecompressionFailed)?;
195 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)
196 } else {
197 V::decode_cfg(item_data, cfg).map_err(Error::Codec)
198 }
199}
200
201pub struct Journal<E: Storage + Metrics, V: Codec> {
215 manager: Manager<E, AppendFactory>,
216
217 compression: Option<u8>,
219
220 codec_config: V::Cfg,
222}
223
224impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
225 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
231 let manager_cfg = ManagerConfig {
232 partition: cfg.partition,
233 factory: AppendFactory {
234 write_buffer: cfg.write_buffer,
235 pool_ref: cfg.buffer_pool,
236 },
237 };
238 let manager = Manager::init(context, manager_cfg).await?;
239
240 Ok(Self {
241 manager,
242 compression: cfg.compression,
243 codec_config: cfg.codec_config,
244 })
245 }
246
247 async fn read(
249 compressed: bool,
250 cfg: &V::Cfg,
251 blob: &Append<E::Blob>,
252 offset: u64,
253 ) -> Result<(u64, u32, V), Error> {
254 let buf = vec![0u8; MAX_VARINT_SIZE];
256 let (stable_buf, available) = blob.read_up_to(buf, offset).await?;
257 let buf = Bytes::from(stable_buf);
258 let mut cursor = Cursor::new(buf.slice(..available));
259 let (next_offset, item_info) = find_item(&mut cursor, offset)?;
260
261 let (item_size, decoded) = match item_info {
263 ItemInfo::Complete {
264 varint_len,
265 data_len,
266 } => {
267 let data = buf.slice(varint_len..varint_len + data_len);
269 let decoded = decode_item::<V>(data, cfg, compressed)?;
270 (data_len as u32, decoded)
271 }
272 ItemInfo::Incomplete {
273 varint_len,
274 prefix_len,
275 total_len,
276 } => {
277 let prefix = buf.slice(varint_len..varint_len + prefix_len);
279 let read_offset = offset + varint_len as u64 + prefix_len as u64;
280 let remainder_len = total_len - prefix_len;
281 let mut remainder = vec![0u8; remainder_len];
282 blob.read_into(&mut remainder, read_offset).await?;
283 let chained = prefix.chain(Bytes::from(remainder));
284 let decoded = decode_item::<V>(chained, cfg, compressed)?;
285 (total_len as u32, decoded)
286 }
287 };
288
289 Ok((next_offset, item_size, decoded))
290 }
291
292 pub async fn replay(
296 &self,
297 start_section: u64,
298 mut start_offset: u64,
299 buffer: NonZeroUsize,
300 ) -> Result<impl Stream<Item = Result<(u64, u64, u32, V), Error>> + Send + '_, Error> {
301 let codec_config = self.codec_config.clone();
303 let compressed = self.compression.is_some();
304 let mut blobs = Vec::new();
305 for (§ion, blob) in self.manager.sections_from(start_section) {
306 blobs.push((
307 section,
308 blob.clone(),
309 blob.replay(buffer).await?,
310 codec_config.clone(),
311 compressed,
312 ));
313 }
314
315 Ok(stream::iter(blobs).flat_map(
317 move |(section, blob, replay, codec_config, compressed)| {
318 let skip_bytes = if section == start_section {
320 start_offset
321 } else {
322 start_offset = 0;
323 0
324 };
325
326 stream::unfold(
327 ReplayState {
328 section,
329 blob,
330 replay,
331 skip_bytes,
332 offset: 0,
333 valid_offset: skip_bytes,
334 codec_config,
335 compressed,
336 done: false,
337 },
338 move |mut state| async move {
339 if state.done {
340 return None;
341 }
342
343 let blob_size = state.replay.blob_size();
344 let mut batch: Vec<Result<(u64, u64, u32, V), Error>> = Vec::new();
345 loop {
346 match state.replay.ensure(MAX_VARINT_SIZE).await {
350 Ok(true) => {}
351 Ok(false) => {
352 if state.replay.remaining() == 0 {
354 state.done = true;
355 return if batch.is_empty() {
356 None
357 } else {
358 Some((batch, state))
359 };
360 }
361 }
363 Err(err) => {
364 batch.push(Err(err.into()));
365 state.done = true;
366 return Some((batch, state));
367 }
368 }
369
370 if state.skip_bytes > 0 {
372 let to_skip =
373 state.skip_bytes.min(state.replay.remaining() as u64) as usize;
374 state.replay.advance(to_skip);
375 state.skip_bytes -= to_skip as u64;
376 state.offset += to_skip as u64;
377 continue;
378 }
379
380 let before_remaining = state.replay.remaining();
382 let (item_size, varint_len) =
383 match decode_length_prefix(&mut state.replay) {
384 Ok(result) => result,
385 Err(err) => {
386 if state.replay.is_exhausted()
388 || before_remaining < MAX_VARINT_SIZE
389 {
390 if state.valid_offset < blob_size
392 && state.offset < blob_size
393 {
394 warn!(
395 blob = state.section,
396 bad_offset = state.offset,
397 new_size = state.valid_offset,
398 "trailing bytes detected: truncating"
399 );
400 state.blob.resize(state.valid_offset).await.ok()?;
401 }
402 state.done = true;
403 return if batch.is_empty() {
404 None
405 } else {
406 Some((batch, state))
407 };
408 }
409 batch.push(Err(err));
410 state.done = true;
411 return Some((batch, state));
412 }
413 };
414
415 match state.replay.ensure(item_size).await {
417 Ok(true) => {}
418 Ok(false) => {
419 warn!(
421 blob = state.section,
422 bad_offset = state.offset,
423 new_size = state.valid_offset,
424 "incomplete item at end: truncating"
425 );
426 state.blob.resize(state.valid_offset).await.ok()?;
427 state.done = true;
428 return if batch.is_empty() {
429 None
430 } else {
431 Some((batch, state))
432 };
433 }
434 Err(err) => {
435 batch.push(Err(err.into()));
436 state.done = true;
437 return Some((batch, state));
438 }
439 }
440
441 let item_offset = state.offset;
443 let next_offset = match state
444 .offset
445 .checked_add(varint_len as u64)
446 .and_then(|o| o.checked_add(item_size as u64))
447 {
448 Some(o) => o,
449 None => {
450 batch.push(Err(Error::OffsetOverflow));
451 state.done = true;
452 return Some((batch, state));
453 }
454 };
455 match decode_item::<V>(
456 (&mut state.replay).take(item_size),
457 &state.codec_config,
458 state.compressed,
459 ) {
460 Ok(decoded) => {
461 batch.push(Ok((
462 state.section,
463 item_offset,
464 item_size as u32,
465 decoded,
466 )));
467 state.valid_offset = next_offset;
468 state.offset = next_offset;
469 }
470 Err(err) => {
471 batch.push(Err(err));
472 state.done = true;
473 return Some((batch, state));
474 }
475 }
476
477 if !batch.is_empty() && state.replay.remaining() < MAX_VARINT_SIZE {
479 return Some((batch, state));
480 }
481 }
482 },
483 )
484 .flat_map(stream::iter)
485 },
486 ))
487 }
488
489 pub async fn append(&mut self, section: u64, item: V) -> Result<(u64, u32), Error> {
493 let (buf, item_len) = if let Some(compression) = self.compression {
495 let encoded = item.encode();
497 let compressed =
498 compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?;
499 let item_len = compressed.len();
500 let item_len_u32: u32 = match item_len.try_into() {
501 Ok(len) => len,
502 Err(_) => return Err(Error::ItemTooLarge(item_len)),
503 };
504 let size_len = UInt(item_len_u32).encode_size();
505 let entry_len = size_len
506 .checked_add(item_len)
507 .ok_or(Error::OffsetOverflow)?;
508
509 let mut buf = Vec::with_capacity(entry_len);
510 UInt(item_len_u32).write(&mut buf);
511 buf.put_slice(&compressed);
512
513 (buf, item_len)
514 } else {
515 let item_len = item.encode_size();
517 let item_len_u32: u32 = match item_len.try_into() {
518 Ok(len) => len,
519 Err(_) => return Err(Error::ItemTooLarge(item_len)),
520 };
521 let size_len = UInt(item_len_u32).encode_size();
522 let entry_len = size_len
523 .checked_add(item_len)
524 .ok_or(Error::OffsetOverflow)?;
525
526 let mut buf = Vec::with_capacity(entry_len);
527 UInt(item_len_u32).write(&mut buf);
528 item.write(&mut buf);
529
530 (buf, item_len)
531 };
532
533 let blob = self.manager.get_or_create(section).await?;
535
536 let offset = blob.size().await;
538
539 blob.append(&buf).await?;
541 trace!(blob = section, offset, "appended item");
542 Ok((offset, item_len as u32))
543 }
544
545 pub async fn get(&self, section: u64, offset: u64) -> Result<V, Error> {
556 let blob = self
557 .manager
558 .get(section)?
559 .ok_or(Error::SectionOutOfRange(section))?;
560
561 let (_, _, item) =
563 Self::read(self.compression.is_some(), &self.codec_config, blob, offset).await?;
564 Ok(item)
565 }
566
567 pub async fn size(&self, section: u64) -> Result<u64, Error> {
571 self.manager.size(section).await
572 }
573
574 pub async fn rewind_to_offset(&mut self, section: u64, offset: u64) -> Result<(), Error> {
582 self.manager.rewind(section, offset).await
583 }
584
585 pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
595 self.manager.rewind(section, size).await
596 }
597
598 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
606 self.manager.rewind_section(section, size).await
607 }
608
609 pub async fn sync(&self, section: u64) -> Result<(), Error> {
613 self.manager.sync(section).await
614 }
615
616 pub async fn sync_all(&self) -> Result<(), Error> {
618 self.manager.sync_all().await
619 }
620
621 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
623 self.manager.prune(min).await
624 }
625
626 pub fn oldest_section(&self) -> Option<u64> {
628 self.manager.oldest_section()
629 }
630
631 pub fn newest_section(&self) -> Option<u64> {
633 self.manager.newest_section()
634 }
635
636 pub fn is_empty(&self) -> bool {
638 self.manager.is_empty()
639 }
640
641 pub fn num_sections(&self) -> usize {
643 self.manager.num_sections()
644 }
645
646 pub async fn destroy(self) -> Result<(), Error> {
648 self.manager.destroy().await
649 }
650}
651
652#[cfg(test)]
653mod tests {
654 use super::*;
655 use bytes::BufMut;
656 use commonware_macros::test_traced;
657 use commonware_runtime::{deterministic, Blob, Runner, Storage};
658 use commonware_utils::{NZUsize, NZU16};
659 use futures::{pin_mut, StreamExt};
660 use std::num::NonZeroU16;
661
662 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
663 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
664
665 #[test_traced]
666 fn test_journal_append_and_read() {
667 let executor = deterministic::Runner::default();
669
670 executor.start(|context| async move {
672 let cfg = Config {
674 partition: "test_partition".into(),
675 compression: None,
676 codec_config: (),
677 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
678 write_buffer: NZUsize!(1024),
679 };
680 let index = 1u64;
681 let data = 10;
682 let mut journal = Journal::init(context.clone(), cfg.clone())
683 .await
684 .expect("Failed to initialize journal");
685
686 journal
688 .append(index, data)
689 .await
690 .expect("Failed to append data");
691
692 let buffer = context.encode();
694 assert!(buffer.contains("tracked 1"));
695
696 journal.sync(index).await.expect("Failed to sync journal");
698 drop(journal);
699 let journal = Journal::<_, i32>::init(context.clone(), cfg)
700 .await
701 .expect("Failed to re-initialize journal");
702
703 let mut items = Vec::new();
705 let stream = journal
706 .replay(0, 0, NZUsize!(1024))
707 .await
708 .expect("unable to setup replay");
709 pin_mut!(stream);
710 while let Some(result) = stream.next().await {
711 match result {
712 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
713 Err(err) => panic!("Failed to read item: {err}"),
714 }
715 }
716
717 assert_eq!(items.len(), 1);
719 assert_eq!(items[0].0, index);
720 assert_eq!(items[0].1, data);
721
722 let buffer = context.encode();
724 assert!(buffer.contains("tracked 1"));
725 });
726 }
727
728 #[test_traced]
729 fn test_journal_multiple_appends_and_reads() {
730 let executor = deterministic::Runner::default();
732
733 executor.start(|context| async move {
735 let cfg = Config {
737 partition: "test_partition".into(),
738 compression: None,
739 codec_config: (),
740 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
741 write_buffer: NZUsize!(1024),
742 };
743
744 let mut journal = Journal::init(context.clone(), cfg.clone())
746 .await
747 .expect("Failed to initialize journal");
748
749 let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
751 for (index, data) in &data_items {
752 journal
753 .append(*index, *data)
754 .await
755 .expect("Failed to append data");
756 journal.sync(*index).await.expect("Failed to sync blob");
757 }
758
759 let buffer = context.encode();
761 assert!(buffer.contains("tracked 3"));
762 assert!(buffer.contains("synced_total 4"));
763
764 drop(journal);
766 let journal = Journal::init(context, cfg)
767 .await
768 .expect("Failed to re-initialize journal");
769
770 let mut items = Vec::<(u64, u32)>::new();
772 {
773 let stream = journal
774 .replay(0, 0, NZUsize!(1024))
775 .await
776 .expect("unable to setup replay");
777 pin_mut!(stream);
778 while let Some(result) = stream.next().await {
779 match result {
780 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
781 Err(err) => panic!("Failed to read item: {err}"),
782 }
783 }
784 }
785
786 assert_eq!(items.len(), data_items.len());
788 for ((expected_index, expected_data), (actual_index, actual_data)) in
789 data_items.iter().zip(items.iter())
790 {
791 assert_eq!(actual_index, expected_index);
792 assert_eq!(actual_data, expected_data);
793 }
794
795 journal.destroy().await.expect("Failed to destroy journal");
797 });
798 }
799
800 #[test_traced]
801 fn test_journal_prune_blobs() {
802 let executor = deterministic::Runner::default();
804
805 executor.start(|context| async move {
807 let cfg = Config {
809 partition: "test_partition".into(),
810 compression: None,
811 codec_config: (),
812 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
813 write_buffer: NZUsize!(1024),
814 };
815
816 let mut journal = Journal::init(context.clone(), cfg.clone())
818 .await
819 .expect("Failed to initialize journal");
820
821 for index in 1u64..=5u64 {
823 journal
824 .append(index, index)
825 .await
826 .expect("Failed to append data");
827 journal.sync(index).await.expect("Failed to sync blob");
828 }
829
830 let data = 99;
832 journal
833 .append(2u64, data)
834 .await
835 .expect("Failed to append data");
836 journal.sync(2u64).await.expect("Failed to sync blob");
837
838 journal.prune(3).await.expect("Failed to prune blobs");
840
841 let buffer = context.encode();
843 assert!(buffer.contains("pruned_total 2"));
844
845 journal.prune(2).await.expect("Failed to no-op prune");
847 let buffer = context.encode();
848 assert!(buffer.contains("pruned_total 2"));
849
850 drop(journal);
852 let mut journal = Journal::init(context.clone(), cfg.clone())
853 .await
854 .expect("Failed to re-initialize journal");
855
856 let mut items = Vec::<(u64, u64)>::new();
858 {
859 let stream = journal
860 .replay(0, 0, NZUsize!(1024))
861 .await
862 .expect("unable to setup replay");
863 pin_mut!(stream);
864 while let Some(result) = stream.next().await {
865 match result {
866 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
867 Err(err) => panic!("Failed to read item: {err}"),
868 }
869 }
870 }
871
872 assert_eq!(items.len(), 3);
874 let expected_indices = [3u64, 4u64, 5u64];
875 for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
876 assert_eq!(item.0, *expected_index);
877 }
878
879 journal.prune(6).await.expect("Failed to prune blobs");
881
882 drop(journal);
884
885 assert!(context
890 .scan(&cfg.partition)
891 .await
892 .expect("Failed to list blobs")
893 .is_empty());
894 });
895 }
896
897 #[test_traced]
898 fn test_journal_prune_guard() {
899 let executor = deterministic::Runner::default();
900
901 executor.start(|context| async move {
902 let cfg = Config {
903 partition: "test_partition".into(),
904 compression: None,
905 codec_config: (),
906 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
907 write_buffer: NZUsize!(1024),
908 };
909
910 let mut journal = Journal::init(context.clone(), cfg.clone())
911 .await
912 .expect("Failed to initialize journal");
913
914 for section in 1u64..=5u64 {
916 journal
917 .append(section, section as i32)
918 .await
919 .expect("Failed to append data");
920 journal.sync(section).await.expect("Failed to sync");
921 }
922
923 journal.prune(3).await.expect("Failed to prune");
925
926 match journal.append(1, 100).await {
930 Err(Error::AlreadyPrunedToSection(3)) => {}
931 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
932 }
933
934 match journal.append(2, 100).await {
935 Err(Error::AlreadyPrunedToSection(3)) => {}
936 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
937 }
938
939 match journal.get(1, 0).await {
941 Err(Error::AlreadyPrunedToSection(3)) => {}
942 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
943 }
944
945 match journal.size(1).await {
947 Err(Error::AlreadyPrunedToSection(3)) => {}
948 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
949 }
950
951 match journal.rewind(2, 0).await {
953 Err(Error::AlreadyPrunedToSection(3)) => {}
954 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
955 }
956
957 match journal.rewind_section(1, 0).await {
959 Err(Error::AlreadyPrunedToSection(3)) => {}
960 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
961 }
962
963 match journal.sync(2).await {
965 Err(Error::AlreadyPrunedToSection(3)) => {}
966 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
967 }
968
969 assert!(journal.get(3, 0).await.is_ok());
971 assert!(journal.get(4, 0).await.is_ok());
972 assert!(journal.get(5, 0).await.is_ok());
973 assert!(journal.size(3).await.is_ok());
974 assert!(journal.sync(4).await.is_ok());
975
976 journal
978 .append(3, 999)
979 .await
980 .expect("Should be able to append to section 3");
981
982 journal.prune(5).await.expect("Failed to prune");
984
985 match journal.get(3, 0).await {
987 Err(Error::AlreadyPrunedToSection(5)) => {}
988 other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
989 }
990
991 match journal.get(4, 0).await {
992 Err(Error::AlreadyPrunedToSection(5)) => {}
993 other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
994 }
995
996 assert!(journal.get(5, 0).await.is_ok());
998 });
999 }
1000
1001 #[test_traced]
1002 fn test_journal_prune_guard_across_restart() {
1003 let executor = deterministic::Runner::default();
1004
1005 executor.start(|context| async move {
1006 let cfg = Config {
1007 partition: "test_partition".into(),
1008 compression: None,
1009 codec_config: (),
1010 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1011 write_buffer: NZUsize!(1024),
1012 };
1013
1014 {
1016 let mut journal = Journal::init(context.clone(), cfg.clone())
1017 .await
1018 .expect("Failed to initialize journal");
1019
1020 for section in 1u64..=5u64 {
1021 journal
1022 .append(section, section as i32)
1023 .await
1024 .expect("Failed to append data");
1025 journal.sync(section).await.expect("Failed to sync");
1026 }
1027
1028 journal.prune(3).await.expect("Failed to prune");
1029 }
1030
1031 {
1033 let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
1034 .await
1035 .expect("Failed to re-initialize journal");
1036
1037 match journal.get(1, 0).await {
1040 Err(Error::SectionOutOfRange(1)) => {}
1041 other => panic!("Expected SectionOutOfRange(1), got {other:?}"),
1042 }
1043
1044 match journal.get(2, 0).await {
1045 Err(Error::SectionOutOfRange(2)) => {}
1046 other => panic!("Expected SectionOutOfRange(2), got {other:?}"),
1047 }
1048
1049 assert!(journal.get(3, 0).await.is_ok());
1051 assert!(journal.get(4, 0).await.is_ok());
1052 assert!(journal.get(5, 0).await.is_ok());
1053 }
1054 });
1055 }
1056
1057 #[test_traced]
1058 fn test_journal_with_invalid_blob_name() {
1059 let executor = deterministic::Runner::default();
1061
1062 executor.start(|context| async move {
1064 let cfg = Config {
1066 partition: "test_partition".into(),
1067 compression: None,
1068 codec_config: (),
1069 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1070 write_buffer: NZUsize!(1024),
1071 };
1072
1073 let invalid_blob_name = b"invalid"; let (blob, _) = context
1076 .open(&cfg.partition, invalid_blob_name)
1077 .await
1078 .expect("Failed to create blob with invalid name");
1079 blob.sync().await.expect("Failed to sync blob");
1080
1081 let result = Journal::<_, u64>::init(context, cfg).await;
1083
1084 assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1086 });
1087 }
1088
1089 #[test_traced]
1090 fn test_journal_read_size_missing() {
1091 let executor = deterministic::Runner::default();
1093
1094 executor.start(|context| async move {
1096 let cfg = Config {
1098 partition: "test_partition".into(),
1099 compression: None,
1100 codec_config: (),
1101 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1102 write_buffer: NZUsize!(1024),
1103 };
1104
1105 let section = 1u64;
1107 let blob_name = section.to_be_bytes();
1108 let (blob, _) = context
1109 .open(&cfg.partition, &blob_name)
1110 .await
1111 .expect("Failed to create blob");
1112
1113 let mut incomplete_data = Vec::new();
1115 UInt(u32::MAX).write(&mut incomplete_data);
1116 incomplete_data.truncate(1);
1117 blob.write_at(incomplete_data, 0)
1118 .await
1119 .expect("Failed to write incomplete data");
1120 blob.sync().await.expect("Failed to sync blob");
1121
1122 let journal = Journal::init(context, cfg)
1124 .await
1125 .expect("Failed to initialize journal");
1126
1127 let stream = journal
1129 .replay(0, 0, NZUsize!(1024))
1130 .await
1131 .expect("unable to setup replay");
1132 pin_mut!(stream);
1133 let mut items = Vec::<(u64, u64)>::new();
1134 while let Some(result) = stream.next().await {
1135 match result {
1136 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1137 Err(err) => panic!("Failed to read item: {err}"),
1138 }
1139 }
1140 assert!(items.is_empty());
1141 });
1142 }
1143
1144 #[test_traced]
1145 fn test_journal_read_item_missing() {
1146 let executor = deterministic::Runner::default();
1148
1149 executor.start(|context| async move {
1151 let cfg = Config {
1153 partition: "test_partition".into(),
1154 compression: None,
1155 codec_config: (),
1156 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1157 write_buffer: NZUsize!(1024),
1158 };
1159
1160 let section = 1u64;
1162 let blob_name = section.to_be_bytes();
1163 let (blob, _) = context
1164 .open(&cfg.partition, &blob_name)
1165 .await
1166 .expect("Failed to create blob");
1167
1168 let item_size: u32 = 10; let mut buf = Vec::new();
1171 UInt(item_size).write(&mut buf); let data = [2u8; 5];
1173 BufMut::put_slice(&mut buf, &data);
1174 blob.write_at(buf, 0)
1175 .await
1176 .expect("Failed to write incomplete item");
1177 blob.sync().await.expect("Failed to sync blob");
1178
1179 let journal = Journal::init(context, cfg)
1181 .await
1182 .expect("Failed to initialize journal");
1183
1184 let stream = journal
1186 .replay(0, 0, NZUsize!(1024))
1187 .await
1188 .expect("unable to setup replay");
1189 pin_mut!(stream);
1190 let mut items = Vec::<(u64, u64)>::new();
1191 while let Some(result) = stream.next().await {
1192 match result {
1193 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1194 Err(err) => panic!("Failed to read item: {err}"),
1195 }
1196 }
1197 assert!(items.is_empty());
1198 });
1199 }
1200
1201 #[test_traced]
1202 fn test_journal_read_checksum_missing() {
1203 let executor = deterministic::Runner::default();
1205
1206 executor.start(|context| async move {
1208 let cfg = Config {
1210 partition: "test_partition".into(),
1211 compression: None,
1212 codec_config: (),
1213 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1214 write_buffer: NZUsize!(1024),
1215 };
1216
1217 let section = 1u64;
1219 let blob_name = section.to_be_bytes();
1220 let (blob, _) = context
1221 .open(&cfg.partition, &blob_name)
1222 .await
1223 .expect("Failed to create blob");
1224
1225 let item_data = b"Test data";
1227 let item_size = item_data.len() as u32;
1228
1229 let mut buf = Vec::new();
1231 UInt(item_size).write(&mut buf);
1232 BufMut::put_slice(&mut buf, item_data);
1233 blob.write_at(buf, 0)
1234 .await
1235 .expect("Failed to write item without checksum");
1236
1237 blob.sync().await.expect("Failed to sync blob");
1238
1239 let journal = Journal::init(context, cfg)
1241 .await
1242 .expect("Failed to initialize journal");
1243
1244 let stream = journal
1248 .replay(0, 0, NZUsize!(1024))
1249 .await
1250 .expect("unable to setup replay");
1251 pin_mut!(stream);
1252 let mut items = Vec::<(u64, u64)>::new();
1253 while let Some(result) = stream.next().await {
1254 match result {
1255 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1256 Err(err) => panic!("Failed to read item: {err}"),
1257 }
1258 }
1259 assert!(items.is_empty());
1260 });
1261 }
1262
1263 #[test_traced]
1264 fn test_journal_read_checksum_mismatch() {
1265 let executor = deterministic::Runner::default();
1267
1268 executor.start(|context| async move {
1270 let cfg = Config {
1272 partition: "test_partition".into(),
1273 compression: None,
1274 codec_config: (),
1275 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1276 write_buffer: NZUsize!(1024),
1277 };
1278
1279 let section = 1u64;
1281 let blob_name = section.to_be_bytes();
1282 let (blob, _) = context
1283 .open(&cfg.partition, &blob_name)
1284 .await
1285 .expect("Failed to create blob");
1286
1287 let item_data = b"Test data";
1289 let item_size = item_data.len() as u32;
1290 let incorrect_checksum: u32 = 0xDEADBEEF;
1291
1292 let mut buf = Vec::new();
1294 UInt(item_size).write(&mut buf);
1295 BufMut::put_slice(&mut buf, item_data);
1296 buf.put_u32(incorrect_checksum);
1297 blob.write_at(buf, 0)
1298 .await
1299 .expect("Failed to write item with bad checksum");
1300
1301 blob.sync().await.expect("Failed to sync blob");
1302
1303 let journal = Journal::init(context.clone(), cfg.clone())
1305 .await
1306 .expect("Failed to initialize journal");
1307
1308 {
1310 let stream = journal
1311 .replay(0, 0, NZUsize!(1024))
1312 .await
1313 .expect("unable to setup replay");
1314 pin_mut!(stream);
1315 let mut items = Vec::<(u64, u64)>::new();
1316 while let Some(result) = stream.next().await {
1317 match result {
1318 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1319 Err(err) => panic!("Failed to read item: {err}"),
1320 }
1321 }
1322 assert!(items.is_empty());
1323 }
1324 drop(journal);
1325
1326 let (_, blob_size) = context
1328 .open(&cfg.partition, §ion.to_be_bytes())
1329 .await
1330 .expect("Failed to open blob");
1331 assert_eq!(blob_size, 0);
1332 });
1333 }
1334
1335 #[test_traced]
1336 fn test_journal_truncation_recovery() {
1337 let executor = deterministic::Runner::default();
1339
1340 executor.start(|context| async move {
1342 let cfg = Config {
1344 partition: "test_partition".into(),
1345 compression: None,
1346 codec_config: (),
1347 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1348 write_buffer: NZUsize!(1024),
1349 };
1350
1351 let mut journal = Journal::init(context.clone(), cfg.clone())
1353 .await
1354 .expect("Failed to initialize journal");
1355
1356 journal.append(1, 1).await.expect("Failed to append data");
1358
1359 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1361 for (index, data) in &data_items {
1362 journal
1363 .append(*index, *data)
1364 .await
1365 .expect("Failed to append data");
1366 journal.sync(*index).await.expect("Failed to sync blob");
1367 }
1368
1369 journal.sync_all().await.expect("Failed to sync");
1371 drop(journal);
1372
1373 let (blob, blob_size) = context
1375 .open(&cfg.partition, &2u64.to_be_bytes())
1376 .await
1377 .expect("Failed to open blob");
1378 blob.resize(blob_size - 4)
1379 .await
1380 .expect("Failed to corrupt blob");
1381 blob.sync().await.expect("Failed to sync blob");
1382
1383 let journal = Journal::init(context.clone(), cfg.clone())
1385 .await
1386 .expect("Failed to re-initialize journal");
1387
1388 let mut items = Vec::<(u64, u32)>::new();
1390 {
1391 let stream = journal
1392 .replay(0, 0, NZUsize!(1024))
1393 .await
1394 .expect("unable to setup replay");
1395 pin_mut!(stream);
1396 while let Some(result) = stream.next().await {
1397 match result {
1398 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1399 Err(err) => panic!("Failed to read item: {err}"),
1400 }
1401 }
1402 }
1403 drop(journal);
1404
1405 assert_eq!(items.len(), 1);
1407 assert_eq!(items[0].0, 1);
1408 assert_eq!(items[0].1, 1);
1409
1410 let (_, blob_size) = context
1412 .open(&cfg.partition, &2u64.to_be_bytes())
1413 .await
1414 .expect("Failed to open blob");
1415 assert_eq!(blob_size, 0);
1416
1417 let mut journal = Journal::init(context.clone(), cfg.clone())
1419 .await
1420 .expect("Failed to re-initialize journal");
1421
1422 let mut items = Vec::<(u64, u32)>::new();
1424 {
1425 let stream = journal
1426 .replay(0, 0, NZUsize!(1024))
1427 .await
1428 .expect("unable to setup replay");
1429 pin_mut!(stream);
1430 while let Some(result) = stream.next().await {
1431 match result {
1432 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1433 Err(err) => panic!("Failed to read item: {err}"),
1434 }
1435 }
1436 }
1437
1438 assert_eq!(items.len(), 1);
1440 assert_eq!(items[0].0, 1);
1441 assert_eq!(items[0].1, 1);
1442
1443 let (_offset, _) = journal.append(2, 5).await.expect("Failed to append data");
1445 journal.sync(2).await.expect("Failed to sync blob");
1446
1447 let item = journal.get(2, 0).await.expect("Failed to get item");
1449 assert_eq!(item, 5);
1450
1451 drop(journal);
1453
1454 let journal = Journal::init(context.clone(), cfg.clone())
1456 .await
1457 .expect("Failed to re-initialize journal");
1458
1459 let mut items = Vec::<(u64, u32)>::new();
1461 {
1462 let stream = journal
1463 .replay(0, 0, NZUsize!(1024))
1464 .await
1465 .expect("unable to setup replay");
1466 pin_mut!(stream);
1467 while let Some(result) = stream.next().await {
1468 match result {
1469 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1470 Err(err) => panic!("Failed to read item: {err}"),
1471 }
1472 }
1473 }
1474
1475 assert_eq!(items.len(), 2);
1477 assert_eq!(items[0].0, 1);
1478 assert_eq!(items[0].1, 1);
1479 assert_eq!(items[1].0, 2);
1480 assert_eq!(items[1].1, 5);
1481 });
1482 }
1483
1484 #[test_traced]
1485 fn test_journal_handling_extra_data() {
1486 let executor = deterministic::Runner::default();
1488
1489 executor.start(|context| async move {
1491 let cfg = Config {
1493 partition: "test_partition".into(),
1494 compression: None,
1495 codec_config: (),
1496 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1497 write_buffer: NZUsize!(1024),
1498 };
1499
1500 let mut journal = Journal::init(context.clone(), cfg.clone())
1502 .await
1503 .expect("Failed to initialize journal");
1504
1505 journal.append(1, 1).await.expect("Failed to append data");
1507
1508 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1510 for (index, data) in &data_items {
1511 journal
1512 .append(*index, *data)
1513 .await
1514 .expect("Failed to append data");
1515 journal.sync(*index).await.expect("Failed to sync blob");
1516 }
1517
1518 journal.sync_all().await.expect("Failed to sync");
1520 drop(journal);
1521
1522 let (blob, blob_size) = context
1524 .open(&cfg.partition, &2u64.to_be_bytes())
1525 .await
1526 .expect("Failed to open blob");
1527 blob.write_at(vec![0u8; 16], blob_size)
1528 .await
1529 .expect("Failed to add extra data");
1530 blob.sync().await.expect("Failed to sync blob");
1531
1532 let journal = Journal::init(context, cfg)
1534 .await
1535 .expect("Failed to re-initialize journal");
1536
1537 let mut items = Vec::<(u64, i32)>::new();
1539 let stream = journal
1540 .replay(0, 0, NZUsize!(1024))
1541 .await
1542 .expect("unable to setup replay");
1543 pin_mut!(stream);
1544 while let Some(result) = stream.next().await {
1545 match result {
1546 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1547 Err(err) => panic!("Failed to read item: {err}"),
1548 }
1549 }
1550 });
1551 }
1552
1553 #[test_traced]
1554 fn test_journal_rewind() {
1555 let executor = deterministic::Runner::default();
1557 executor.start(|context| async move {
1558 let cfg = Config {
1560 partition: "test_partition".to_string(),
1561 compression: None,
1562 codec_config: (),
1563 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1564 write_buffer: NZUsize!(1024),
1565 };
1566 let mut journal = Journal::init(context, cfg).await.unwrap();
1567
1568 let size = journal.size(1).await.unwrap();
1570 assert_eq!(size, 0);
1571
1572 journal.append(1, 42i32).await.unwrap();
1574
1575 let size = journal.size(1).await.unwrap();
1577 assert!(size > 0);
1578
1579 journal.append(1, 43i32).await.unwrap();
1581 let new_size = journal.size(1).await.unwrap();
1582 assert!(new_size > size);
1583
1584 let size = journal.size(2).await.unwrap();
1586 assert_eq!(size, 0);
1587
1588 journal.append(2, 44i32).await.unwrap();
1590
1591 let size = journal.size(2).await.unwrap();
1593 assert!(size > 0);
1594
1595 journal.rewind(1, 0).await.unwrap();
1597
1598 let size = journal.size(1).await.unwrap();
1600 assert_eq!(size, 0);
1601
1602 let size = journal.size(2).await.unwrap();
1604 assert_eq!(size, 0);
1605 });
1606 }
1607
1608 #[test_traced]
1609 fn test_journal_rewind_section() {
1610 let executor = deterministic::Runner::default();
1612 executor.start(|context| async move {
1613 let cfg = Config {
1615 partition: "test_partition".to_string(),
1616 compression: None,
1617 codec_config: (),
1618 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1619 write_buffer: NZUsize!(1024),
1620 };
1621 let mut journal = Journal::init(context, cfg).await.unwrap();
1622
1623 let size = journal.size(1).await.unwrap();
1625 assert_eq!(size, 0);
1626
1627 journal.append(1, 42i32).await.unwrap();
1629
1630 let size = journal.size(1).await.unwrap();
1632 assert!(size > 0);
1633
1634 journal.append(1, 43i32).await.unwrap();
1636 let new_size = journal.size(1).await.unwrap();
1637 assert!(new_size > size);
1638
1639 let size = journal.size(2).await.unwrap();
1641 assert_eq!(size, 0);
1642
1643 journal.append(2, 44i32).await.unwrap();
1645
1646 let size = journal.size(2).await.unwrap();
1648 assert!(size > 0);
1649
1650 journal.rewind_section(1, 0).await.unwrap();
1652
1653 let size = journal.size(1).await.unwrap();
1655 assert_eq!(size, 0);
1656
1657 let size = journal.size(2).await.unwrap();
1659 assert!(size > 0);
1660 });
1661 }
1662
1663 #[test_traced]
1664 fn test_journal_small_items() {
1665 let executor = deterministic::Runner::default();
1666 executor.start(|context| async move {
1667 let cfg = Config {
1668 partition: "test_partition".into(),
1669 compression: None,
1670 codec_config: (),
1671 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1672 write_buffer: NZUsize!(1024),
1673 };
1674
1675 let mut journal = Journal::init(context.clone(), cfg.clone())
1676 .await
1677 .expect("Failed to initialize journal");
1678
1679 let num_items = 100;
1681 let mut offsets = Vec::new();
1682 for i in 0..num_items {
1683 let (offset, size) = journal
1684 .append(1, i as u8)
1685 .await
1686 .expect("Failed to append data");
1687 assert_eq!(size, 1, "u8 should encode to 1 byte");
1688 offsets.push(offset);
1689 }
1690 journal.sync(1).await.expect("Failed to sync");
1691
1692 for (i, &offset) in offsets.iter().enumerate() {
1694 let item: u8 = journal.get(1, offset).await.expect("Failed to get item");
1695 assert_eq!(item, i as u8, "Item mismatch at offset {offset}");
1696 }
1697
1698 drop(journal);
1700 let journal = Journal::<_, u8>::init(context, cfg)
1701 .await
1702 .expect("Failed to re-initialize journal");
1703
1704 let stream = journal
1706 .replay(0, 0, NZUsize!(1024))
1707 .await
1708 .expect("Failed to setup replay");
1709 pin_mut!(stream);
1710
1711 let mut count = 0;
1712 while let Some(result) = stream.next().await {
1713 let (section, offset, size, item) = result.expect("Failed to replay item");
1714 assert_eq!(section, 1);
1715 assert_eq!(offset, offsets[count]);
1716 assert_eq!(size, 1);
1717 assert_eq!(item, count as u8);
1718 count += 1;
1719 }
1720 assert_eq!(count, num_items, "Should replay all items");
1721 });
1722 }
1723
1724 #[test_traced]
1725 fn test_journal_rewind_many_sections() {
1726 let executor = deterministic::Runner::default();
1727 executor.start(|context| async move {
1728 let cfg = Config {
1729 partition: "test_partition".to_string(),
1730 compression: None,
1731 codec_config: (),
1732 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1733 write_buffer: NZUsize!(1024),
1734 };
1735 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1736
1737 for section in 1u64..=10 {
1739 journal.append(section, section as i32).await.unwrap();
1740 }
1741 journal.sync_all().await.unwrap();
1742
1743 for section in 1u64..=10 {
1745 let size = journal.size(section).await.unwrap();
1746 assert!(size > 0, "section {section} should have data");
1747 }
1748
1749 journal
1751 .rewind(5, journal.size(5).await.unwrap())
1752 .await
1753 .unwrap();
1754
1755 for section in 1u64..=5 {
1757 let size = journal.size(section).await.unwrap();
1758 assert!(size > 0, "section {section} should still have data");
1759 }
1760
1761 for section in 6u64..=10 {
1763 let size = journal.size(section).await.unwrap();
1764 assert_eq!(size, 0, "section {section} should be removed");
1765 }
1766
1767 {
1769 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1770 pin_mut!(stream);
1771 let mut items = Vec::new();
1772 while let Some(result) = stream.next().await {
1773 let (section, _, _, item) = result.unwrap();
1774 items.push((section, item));
1775 }
1776 assert_eq!(items.len(), 5);
1777 for (i, (section, item)) in items.iter().enumerate() {
1778 assert_eq!(*section, (i + 1) as u64);
1779 assert_eq!(*item, (i + 1) as i32);
1780 }
1781 }
1782
1783 journal.destroy().await.unwrap();
1784 });
1785 }
1786
1787 #[test_traced]
1788 fn test_journal_rewind_partial_truncation() {
1789 let executor = deterministic::Runner::default();
1790 executor.start(|context| async move {
1791 let cfg = Config {
1792 partition: "test_partition".to_string(),
1793 compression: None,
1794 codec_config: (),
1795 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1796 write_buffer: NZUsize!(1024),
1797 };
1798 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1799
1800 let mut sizes = Vec::new();
1802 for i in 0..5 {
1803 journal.append(1, i).await.unwrap();
1804 journal.sync(1).await.unwrap();
1805 sizes.push(journal.size(1).await.unwrap());
1806 }
1807
1808 let target_size = sizes[2];
1810 journal.rewind(1, target_size).await.unwrap();
1811
1812 let new_size = journal.size(1).await.unwrap();
1814 assert_eq!(new_size, target_size);
1815
1816 {
1818 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1819 pin_mut!(stream);
1820 let mut items = Vec::new();
1821 while let Some(result) = stream.next().await {
1822 let (_, _, _, item) = result.unwrap();
1823 items.push(item);
1824 }
1825 assert_eq!(items.len(), 3);
1826 for (i, item) in items.iter().enumerate() {
1827 assert_eq!(*item, i as i32);
1828 }
1829 }
1830
1831 journal.destroy().await.unwrap();
1832 });
1833 }
1834
1835 #[test_traced]
1836 fn test_journal_rewind_nonexistent_target() {
1837 let executor = deterministic::Runner::default();
1838 executor.start(|context| async move {
1839 let cfg = Config {
1840 partition: "test_partition".to_string(),
1841 compression: None,
1842 codec_config: (),
1843 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1844 write_buffer: NZUsize!(1024),
1845 };
1846 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1847
1848 for section in 5u64..=7 {
1850 journal.append(section, section as i32).await.unwrap();
1851 }
1852 journal.sync_all().await.unwrap();
1853
1854 journal.rewind(3, 0).await.unwrap();
1856
1857 for section in 5u64..=7 {
1859 let size = journal.size(section).await.unwrap();
1860 assert_eq!(size, 0, "section {section} should be removed");
1861 }
1862
1863 {
1865 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1866 pin_mut!(stream);
1867 let items: Vec<_> = stream.collect().await;
1868 assert!(items.is_empty());
1869 }
1870
1871 journal.destroy().await.unwrap();
1872 });
1873 }
1874
1875 #[test_traced]
1876 fn test_journal_rewind_persistence() {
1877 let executor = deterministic::Runner::default();
1878 executor.start(|context| async move {
1879 let cfg = Config {
1880 partition: "test_partition".to_string(),
1881 compression: None,
1882 codec_config: (),
1883 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1884 write_buffer: NZUsize!(1024),
1885 };
1886
1887 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1889 for section in 1u64..=5 {
1890 journal.append(section, section as i32).await.unwrap();
1891 }
1892 journal.sync_all().await.unwrap();
1893
1894 let size = journal.size(2).await.unwrap();
1896 journal.rewind(2, size).await.unwrap();
1897 journal.sync_all().await.unwrap();
1898 drop(journal);
1899
1900 let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
1902 .await
1903 .unwrap();
1904
1905 for section in 1u64..=2 {
1907 let size = journal.size(section).await.unwrap();
1908 assert!(size > 0, "section {section} should have data after restart");
1909 }
1910
1911 for section in 3u64..=5 {
1913 let size = journal.size(section).await.unwrap();
1914 assert_eq!(size, 0, "section {section} should be gone after restart");
1915 }
1916
1917 {
1919 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1920 pin_mut!(stream);
1921 let mut items = Vec::new();
1922 while let Some(result) = stream.next().await {
1923 let (section, _, _, item) = result.unwrap();
1924 items.push((section, item));
1925 }
1926 assert_eq!(items.len(), 2);
1927 assert_eq!(items[0], (1, 1));
1928 assert_eq!(items[1], (2, 2));
1929 }
1930
1931 journal.destroy().await.unwrap();
1932 });
1933 }
1934
1935 #[test_traced]
1936 fn test_journal_rewind_to_zero_removes_all_newer() {
1937 let executor = deterministic::Runner::default();
1938 executor.start(|context| async move {
1939 let cfg = Config {
1940 partition: "test_partition".to_string(),
1941 compression: None,
1942 codec_config: (),
1943 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1944 write_buffer: NZUsize!(1024),
1945 };
1946 let mut journal = Journal::init(context.clone(), cfg.clone()).await.unwrap();
1947
1948 for section in 1u64..=3 {
1950 journal.append(section, section as i32).await.unwrap();
1951 }
1952 journal.sync_all().await.unwrap();
1953
1954 journal.rewind(1, 0).await.unwrap();
1956
1957 let size = journal.size(1).await.unwrap();
1959 assert_eq!(size, 0, "section 1 should be empty");
1960
1961 for section in 2u64..=3 {
1963 let size = journal.size(section).await.unwrap();
1964 assert_eq!(size, 0, "section {section} should be removed");
1965 }
1966
1967 {
1969 let stream = journal.replay(0, 0, NZUsize!(1024)).await.unwrap();
1970 pin_mut!(stream);
1971 let items: Vec<_> = stream.collect().await;
1972 assert!(items.is_empty());
1973 }
1974
1975 journal.destroy().await.unwrap();
1976 });
1977 }
1978
1979 #[test_traced]
1980 fn test_journal_replay_start_offset_with_trailing_bytes() {
1981 let executor = deterministic::Runner::default();
1983 executor.start(|context| async move {
1984 let cfg = Config {
1985 partition: "test_partition".into(),
1986 compression: None,
1987 codec_config: (),
1988 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1989 write_buffer: NZUsize!(1024),
1990 };
1991 let mut journal = Journal::init(context.clone(), cfg.clone())
1992 .await
1993 .expect("Failed to initialize journal");
1994
1995 for i in 0..5i32 {
1997 journal.append(1, i).await.unwrap();
1998 }
1999 journal.sync(1).await.unwrap();
2000 let valid_logical_size = journal.size(1).await.unwrap();
2001 drop(journal);
2002
2003 let (blob, physical_size_before) = context
2005 .open(&cfg.partition, &1u64.to_be_bytes())
2006 .await
2007 .unwrap();
2008
2009 blob.write_at(vec![0xFF, 0xFF], physical_size_before)
2012 .await
2013 .unwrap();
2014 blob.sync().await.unwrap();
2015
2016 let start_offset = valid_logical_size;
2020 {
2021 let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
2022 .await
2023 .unwrap();
2024
2025 let stream = journal
2026 .replay(1, start_offset, NZUsize!(1024))
2027 .await
2028 .unwrap();
2029 pin_mut!(stream);
2030
2031 while let Some(_result) = stream.next().await {}
2033 }
2034
2035 let (_, physical_size_after) = context
2037 .open(&cfg.partition, &1u64.to_be_bytes())
2038 .await
2039 .unwrap();
2040
2041 assert!(
2044 physical_size_after >= physical_size_before,
2045 "Valid data was lost! Physical blob truncated from {physical_size_before} to \
2046 {physical_size_after}. Logical valid size was {valid_logical_size}. \
2047 This indicates valid_offset was incorrectly initialized to 0 instead of start_offset."
2048 );
2049 });
2050 }
2051
2052 #[test_traced]
2053 fn test_journal_large_item_spanning_pages() {
2054 const LARGE_SIZE: usize = 2048;
2056 type LargeItem = [u8; LARGE_SIZE];
2057
2058 let executor = deterministic::Runner::default();
2059 executor.start(|context| async move {
2060 let cfg = Config {
2061 partition: "test_partition".into(),
2062 compression: None,
2063 codec_config: (),
2064 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2065 write_buffer: NZUsize!(4096),
2066 };
2067 let mut journal = Journal::init(context.clone(), cfg.clone())
2068 .await
2069 .expect("Failed to initialize journal");
2070
2071 let mut large_data: LargeItem = [0u8; LARGE_SIZE];
2073 for (i, byte) in large_data.iter_mut().enumerate() {
2074 *byte = (i % 256) as u8;
2075 }
2076 assert!(
2077 LARGE_SIZE > PAGE_SIZE.get() as usize,
2078 "Item must be larger than page size"
2079 );
2080
2081 let (offset, size) = journal
2083 .append(1, large_data)
2084 .await
2085 .expect("Failed to append large item");
2086 assert_eq!(size as usize, LARGE_SIZE);
2087 journal.sync(1).await.expect("Failed to sync");
2088
2089 let retrieved: LargeItem = journal
2091 .get(1, offset)
2092 .await
2093 .expect("Failed to get large item");
2094 assert_eq!(retrieved, large_data, "Random access read mismatch");
2095
2096 drop(journal);
2098 let journal = Journal::<_, LargeItem>::init(context.clone(), cfg.clone())
2099 .await
2100 .expect("Failed to re-initialize journal");
2101
2102 {
2104 let stream = journal
2105 .replay(0, 0, NZUsize!(1024))
2106 .await
2107 .expect("Failed to setup replay");
2108 pin_mut!(stream);
2109
2110 let mut items = Vec::new();
2111 while let Some(result) = stream.next().await {
2112 let (section, off, sz, item) = result.expect("Failed to replay item");
2113 items.push((section, off, sz, item));
2114 }
2115
2116 assert_eq!(items.len(), 1, "Should have exactly one item");
2117 let (section, off, sz, item) = &items[0];
2118 assert_eq!(*section, 1);
2119 assert_eq!(*off, offset);
2120 assert_eq!(*sz as usize, LARGE_SIZE);
2121 assert_eq!(*item, large_data, "Replay read mismatch");
2122 }
2123
2124 journal.destroy().await.unwrap();
2125 });
2126 }
2127
2128 #[test_traced]
2129 fn test_journal_non_contiguous_sections() {
2130 let executor = deterministic::Runner::default();
2133 executor.start(|context| async move {
2134 let cfg = Config {
2135 partition: "test_partition".into(),
2136 compression: None,
2137 codec_config: (),
2138 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2139 write_buffer: NZUsize!(1024),
2140 };
2141 let mut journal = Journal::init(context.clone(), cfg.clone())
2142 .await
2143 .expect("Failed to initialize journal");
2144
2145 let sections_and_data = [(1u64, 100i32), (5u64, 500i32), (10u64, 1000i32)];
2147 let mut offsets = Vec::new();
2148
2149 for (section, data) in §ions_and_data {
2150 let (offset, _) = journal
2151 .append(*section, *data)
2152 .await
2153 .expect("Failed to append");
2154 offsets.push(offset);
2155 }
2156 journal.sync_all().await.expect("Failed to sync");
2157
2158 for (i, (section, expected_data)) in sections_and_data.iter().enumerate() {
2160 let retrieved: i32 = journal
2161 .get(*section, offsets[i])
2162 .await
2163 .expect("Failed to get item");
2164 assert_eq!(retrieved, *expected_data);
2165 }
2166
2167 for missing_section in [0u64, 2, 3, 4, 6, 7, 8, 9, 11] {
2169 let result = journal.get(missing_section, 0).await;
2170 assert!(
2171 matches!(result, Err(Error::SectionOutOfRange(_))),
2172 "Expected SectionOutOfRange for section {}, got {:?}",
2173 missing_section,
2174 result
2175 );
2176 }
2177
2178 drop(journal);
2180 let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
2181 .await
2182 .expect("Failed to re-initialize journal");
2183
2184 {
2186 let stream = journal
2187 .replay(0, 0, NZUsize!(1024))
2188 .await
2189 .expect("Failed to setup replay");
2190 pin_mut!(stream);
2191
2192 let mut items = Vec::new();
2193 while let Some(result) = stream.next().await {
2194 let (section, _, _, item) = result.expect("Failed to replay item");
2195 items.push((section, item));
2196 }
2197
2198 assert_eq!(items.len(), 3, "Should have 3 items");
2199 assert_eq!(items[0], (1, 100));
2200 assert_eq!(items[1], (5, 500));
2201 assert_eq!(items[2], (10, 1000));
2202 }
2203
2204 {
2206 let stream = journal
2207 .replay(5, 0, NZUsize!(1024))
2208 .await
2209 .expect("Failed to setup replay from section 5");
2210 pin_mut!(stream);
2211
2212 let mut items = Vec::new();
2213 while let Some(result) = stream.next().await {
2214 let (section, _, _, item) = result.expect("Failed to replay item");
2215 items.push((section, item));
2216 }
2217
2218 assert_eq!(items.len(), 2, "Should have 2 items from section 5 onwards");
2219 assert_eq!(items[0], (5, 500));
2220 assert_eq!(items[1], (10, 1000));
2221 }
2222
2223 {
2225 let stream = journal
2226 .replay(3, 0, NZUsize!(1024))
2227 .await
2228 .expect("Failed to setup replay from section 3");
2229 pin_mut!(stream);
2230
2231 let mut items = Vec::new();
2232 while let Some(result) = stream.next().await {
2233 let (section, _, _, item) = result.expect("Failed to replay item");
2234 items.push((section, item));
2235 }
2236
2237 assert_eq!(items.len(), 2);
2239 assert_eq!(items[0], (5, 500));
2240 assert_eq!(items[1], (10, 1000));
2241 }
2242
2243 journal.destroy().await.unwrap();
2244 });
2245 }
2246
2247 #[test_traced]
2248 fn test_journal_empty_section_in_middle() {
2249 let executor = deterministic::Runner::default();
2252 executor.start(|context| async move {
2253 let cfg = Config {
2254 partition: "test_partition".into(),
2255 compression: None,
2256 codec_config: (),
2257 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2258 write_buffer: NZUsize!(1024),
2259 };
2260 let mut journal = Journal::init(context.clone(), cfg.clone())
2261 .await
2262 .expect("Failed to initialize journal");
2263
2264 journal.append(1, 100i32).await.expect("Failed to append");
2266
2267 journal.append(2, 200i32).await.expect("Failed to append");
2270 journal.sync(2).await.expect("Failed to sync");
2271 journal
2272 .rewind_section(2, 0)
2273 .await
2274 .expect("Failed to rewind");
2275
2276 journal.append(3, 300i32).await.expect("Failed to append");
2278
2279 journal.sync_all().await.expect("Failed to sync");
2280
2281 assert!(journal.size(1).await.unwrap() > 0);
2283 assert_eq!(journal.size(2).await.unwrap(), 0);
2284 assert!(journal.size(3).await.unwrap() > 0);
2285
2286 drop(journal);
2288 let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
2289 .await
2290 .expect("Failed to re-initialize journal");
2291
2292 {
2294 let stream = journal
2295 .replay(0, 0, NZUsize!(1024))
2296 .await
2297 .expect("Failed to setup replay");
2298 pin_mut!(stream);
2299
2300 let mut items = Vec::new();
2301 while let Some(result) = stream.next().await {
2302 let (section, _, _, item) = result.expect("Failed to replay item");
2303 items.push((section, item));
2304 }
2305
2306 assert_eq!(
2307 items.len(),
2308 2,
2309 "Should have 2 items (skipping empty section)"
2310 );
2311 assert_eq!(items[0], (1, 100));
2312 assert_eq!(items[1], (3, 300));
2313 }
2314
2315 {
2317 let stream = journal
2318 .replay(2, 0, NZUsize!(1024))
2319 .await
2320 .expect("Failed to setup replay from section 2");
2321 pin_mut!(stream);
2322
2323 let mut items = Vec::new();
2324 while let Some(result) = stream.next().await {
2325 let (section, _, _, item) = result.expect("Failed to replay item");
2326 items.push((section, item));
2327 }
2328
2329 assert_eq!(items.len(), 1, "Should have 1 item from section 3");
2330 assert_eq!(items[0], (3, 300));
2331 }
2332
2333 journal.destroy().await.unwrap();
2334 });
2335 }
2336
2337 #[test_traced]
2338 fn test_journal_item_exactly_page_size() {
2339 const ITEM_SIZE: usize = PAGE_SIZE.get() as usize;
2342 type ExactItem = [u8; ITEM_SIZE];
2343
2344 let executor = deterministic::Runner::default();
2345 executor.start(|context| async move {
2346 let cfg = Config {
2347 partition: "test_partition".into(),
2348 compression: None,
2349 codec_config: (),
2350 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2351 write_buffer: NZUsize!(4096),
2352 };
2353 let mut journal = Journal::init(context.clone(), cfg.clone())
2354 .await
2355 .expect("Failed to initialize journal");
2356
2357 let mut exact_data: ExactItem = [0u8; ITEM_SIZE];
2359 for (i, byte) in exact_data.iter_mut().enumerate() {
2360 *byte = (i % 256) as u8;
2361 }
2362
2363 let (offset, size) = journal
2365 .append(1, exact_data)
2366 .await
2367 .expect("Failed to append exact item");
2368 assert_eq!(size as usize, ITEM_SIZE);
2369 journal.sync(1).await.expect("Failed to sync");
2370
2371 let retrieved: ExactItem = journal
2373 .get(1, offset)
2374 .await
2375 .expect("Failed to get exact item");
2376 assert_eq!(retrieved, exact_data, "Random access read mismatch");
2377
2378 drop(journal);
2380 let journal = Journal::<_, ExactItem>::init(context.clone(), cfg.clone())
2381 .await
2382 .expect("Failed to re-initialize journal");
2383
2384 {
2386 let stream = journal
2387 .replay(0, 0, NZUsize!(1024))
2388 .await
2389 .expect("Failed to setup replay");
2390 pin_mut!(stream);
2391
2392 let mut items = Vec::new();
2393 while let Some(result) = stream.next().await {
2394 let (section, off, sz, item) = result.expect("Failed to replay item");
2395 items.push((section, off, sz, item));
2396 }
2397
2398 assert_eq!(items.len(), 1, "Should have exactly one item");
2399 let (section, off, sz, item) = &items[0];
2400 assert_eq!(*section, 1);
2401 assert_eq!(*off, offset);
2402 assert_eq!(*sz as usize, ITEM_SIZE);
2403 assert_eq!(*item, exact_data, "Replay read mismatch");
2404 }
2405
2406 journal.destroy().await.unwrap();
2407 });
2408 }
2409
2410 #[test_traced]
2411 fn test_journal_varint_spanning_page_boundary() {
2412 const SMALL_PAGE: NonZeroU16 = NZU16!(16);
2420
2421 let executor = deterministic::Runner::default();
2422 executor.start(|context| async move {
2423 let cfg = Config {
2424 partition: "test_partition".into(),
2425 compression: None,
2426 codec_config: (),
2427 buffer_pool: PoolRef::new(SMALL_PAGE, PAGE_CACHE_SIZE),
2428 write_buffer: NZUsize!(1024),
2429 };
2430 let mut journal: Journal<_, [u8; 128]> = Journal::init(context.clone(), cfg.clone())
2431 .await
2432 .expect("Failed to initialize journal");
2433
2434 let item1: [u8; 128] = [1u8; 128];
2436 let item2: [u8; 128] = [2u8; 128];
2437 let item3: [u8; 128] = [3u8; 128];
2438
2439 let (offset1, _) = journal.append(1, item1).await.expect("Failed to append");
2442 let (offset2, _) = journal.append(1, item2).await.expect("Failed to append");
2443 let (offset3, _) = journal.append(1, item3).await.expect("Failed to append");
2444
2445 journal.sync(1).await.expect("Failed to sync");
2446
2447 let retrieved1: [u8; 128] = journal.get(1, offset1).await.expect("Failed to get");
2449 let retrieved2: [u8; 128] = journal.get(1, offset2).await.expect("Failed to get");
2450 let retrieved3: [u8; 128] = journal.get(1, offset3).await.expect("Failed to get");
2451 assert_eq!(retrieved1, item1);
2452 assert_eq!(retrieved2, item2);
2453 assert_eq!(retrieved3, item3);
2454
2455 drop(journal);
2457 let journal: Journal<_, [u8; 128]> = Journal::init(context.clone(), cfg.clone())
2458 .await
2459 .expect("Failed to re-initialize journal");
2460
2461 {
2463 let stream = journal
2464 .replay(0, 0, NZUsize!(64))
2465 .await
2466 .expect("Failed to setup replay");
2467 pin_mut!(stream);
2468
2469 let mut items = Vec::new();
2470 while let Some(result) = stream.next().await {
2471 let (section, off, _, item) = result.expect("Failed to replay item");
2472 items.push((section, off, item));
2473 }
2474
2475 assert_eq!(items.len(), 3, "Should have 3 items");
2476 assert_eq!(items[0], (1, offset1, item1));
2477 assert_eq!(items[1], (1, offset2, item2));
2478 assert_eq!(items[2], (1, offset3, item3));
2479 }
2480
2481 journal.destroy().await.unwrap();
2482 });
2483 }
2484}