1use crate::journal::Error;
98use bytes::{Buf, BufMut};
99use commonware_codec::{varint::UInt, Codec, EncodeSize, ReadExt, Write as CodecWrite};
100use commonware_runtime::{
101 buffer::{Append, PoolRef, Read},
102 telemetry::metrics::status::GaugeExt,
103 Blob, Error as RError, Metrics, Storage,
104};
105use commonware_utils::hex;
106use futures::stream::{self, Stream, StreamExt};
107use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
108use std::{
109 collections::{btree_map::Entry, BTreeMap},
110 io::Cursor,
111 marker::PhantomData,
112 num::NonZeroUsize,
113};
114use tracing::{debug, trace, warn};
115use zstd::{bulk::compress, decode_all};
116
117#[derive(Clone)]
119pub struct Config<C> {
120 pub partition: String,
123
124 pub compression: Option<u8>,
126
127 pub codec_config: C,
129
130 pub buffer_pool: PoolRef,
132
133 pub write_buffer: NonZeroUsize,
135}
136
137pub(crate) const ITEM_ALIGNMENT: u64 = 16;
138
139const MIN_ITEM_SIZE: usize = 5;
143
144#[inline]
147fn compute_next_offset(mut offset: u64) -> Result<u32, Error> {
148 let overage = offset % ITEM_ALIGNMENT;
149 if overage != 0 {
150 offset += ITEM_ALIGNMENT - overage;
151 }
152 let offset = offset / ITEM_ALIGNMENT;
153 let aligned_offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
154 Ok(aligned_offset)
155}
156
157pub struct Journal<E: Storage + Metrics, V: Codec> {
159 pub(crate) context: E,
160 pub(crate) cfg: Config<V::Cfg>,
161
162 pub(crate) blobs: BTreeMap<u64, Append<E::Blob>>,
163
164 pub(crate) oldest_retained_section: u64,
168
169 pub(crate) tracked: Gauge,
170 pub(crate) synced: Counter,
171 pub(crate) pruned: Counter,
172
173 pub(crate) _phantom: PhantomData<V>,
174}
175
176impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
177 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
183 let mut blobs = BTreeMap::new();
185 let stored_blobs = match context.scan(&cfg.partition).await {
186 Ok(blobs) => blobs,
187 Err(RError::PartitionMissing(_)) => Vec::new(),
188 Err(err) => return Err(Error::Runtime(err)),
189 };
190 for name in stored_blobs {
191 let (blob, size) = context.open(&cfg.partition, &name).await?;
192 let hex_name = hex(&name);
193 let section = match name.try_into() {
194 Ok(section) => u64::from_be_bytes(section),
195 Err(_) => return Err(Error::InvalidBlobName(hex_name)),
196 };
197 debug!(section, blob = hex_name, size, "loaded section");
198 let blob = Append::new(blob, size, cfg.write_buffer, cfg.buffer_pool.clone()).await?;
199 blobs.insert(section, blob);
200 }
201
202 let tracked = Gauge::default();
204 let synced = Counter::default();
205 let pruned = Counter::default();
206 context.register("tracked", "Number of blobs", tracked.clone());
207 context.register("synced", "Number of syncs", synced.clone());
208 context.register("pruned", "Number of blobs pruned", pruned.clone());
209 let _ = tracked.try_set(blobs.len());
210
211 Ok(Self {
213 context,
214 cfg,
215 blobs,
216 oldest_retained_section: 0,
217 tracked,
218 synced,
219 pruned,
220
221 _phantom: PhantomData,
222 })
223 }
224
225 const fn prune_guard(&self, section: u64) -> Result<(), Error> {
227 if section < self.oldest_retained_section {
228 Err(Error::AlreadyPrunedToSection(self.oldest_retained_section))
229 } else {
230 Ok(())
231 }
232 }
233
234 pub(crate) async fn read(
236 compressed: bool,
237 cfg: &V::Cfg,
238 blob: &Append<E::Blob>,
239 offset: u32,
240 ) -> Result<(u32, u32, V), Error> {
241 let mut hasher = crc32fast::Hasher::new();
243 let offset = offset as u64 * ITEM_ALIGNMENT;
244 let varint_buf = blob.read_at(vec![0; MIN_ITEM_SIZE], offset).await?;
245 let mut varint = varint_buf.as_ref();
246 let size = UInt::<u32>::read(&mut varint).map_err(Error::Codec)?.0 as usize;
247 let varint_len = MIN_ITEM_SIZE - varint.remaining();
248 hasher.update(&varint_buf.as_ref()[..varint_len]);
249 let offset = offset
250 .checked_add(varint_len as u64)
251 .ok_or(Error::OffsetOverflow)?;
252
253 let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
255 let buf = blob.read_at(vec![0u8; buf_size], offset).await?;
256 let buf = buf.as_ref();
257 let offset = offset
258 .checked_add(buf_size as u64)
259 .ok_or(Error::OffsetOverflow)?;
260
261 let item = &buf[..size];
263 hasher.update(item);
264
265 let checksum = hasher.finalize();
267 let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
268 if checksum != stored_checksum {
269 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
270 }
271
272 let aligned_offset = compute_next_offset(offset)?;
274
275 let item = if compressed {
277 let decompressed =
278 decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
279 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
280 } else {
281 V::decode_cfg(item, cfg).map_err(Error::Codec)?
282 };
283
284 Ok((aligned_offset, size as u32, item))
286 }
287
288 async fn read_buffered(
290 reader: &mut Read<Append<E::Blob>>,
291 offset: u32,
292 cfg: &V::Cfg,
293 compressed: bool,
294 ) -> Result<(u32, u64, u32, V), Error> {
295 let file_offset = offset as u64 * ITEM_ALIGNMENT;
297
298 if reader.position() != file_offset {
300 reader.seek_to(file_offset).map_err(Error::Runtime)?;
301 }
302
303 let mut hasher = crc32fast::Hasher::new();
305 let mut varint_buf = [0u8; MIN_ITEM_SIZE];
306 reader
307 .read_exact(&mut varint_buf, MIN_ITEM_SIZE)
308 .await
309 .map_err(Error::Runtime)?;
310 let mut varint = varint_buf.as_ref();
311 let size = UInt::<u32>::read(&mut varint).map_err(Error::Codec)?.0 as usize;
312 let varint_len = MIN_ITEM_SIZE - varint.remaining();
313 hasher.update(&varint_buf[..varint_len]);
314
315 let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
317 let already_read = MIN_ITEM_SIZE - varint_len;
318 let mut buf = vec![0u8; buf_size];
319 buf[..already_read].copy_from_slice(&varint_buf[varint_len..]);
320 if buf_size > already_read {
321 reader
322 .read_exact(&mut buf[already_read..], buf_size - already_read)
323 .await
324 .map_err(Error::Runtime)?;
325 }
326
327 let item = &buf[..size];
329 hasher.update(item);
330
331 let checksum = hasher.finalize();
333 let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
334 if checksum != stored_checksum {
335 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
336 }
337
338 let item = if compressed {
340 let decompressed =
341 decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
342 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
343 } else {
344 V::decode_cfg(item, cfg).map_err(Error::Codec)?
345 };
346
347 let current_pos = reader.position();
349 let aligned_offset = compute_next_offset(current_pos)?;
350 Ok((aligned_offset, current_pos, size as u32, item))
351 }
352
353 pub async fn replay(
366 &self,
367 start_section: u64,
368 mut offset: u32,
369 buffer: NonZeroUsize,
370 ) -> Result<impl Stream<Item = Result<(u64, u32, u32, V), Error>> + '_, Error> {
371 let codec_config = self.cfg.codec_config.clone();
373 let compressed = self.cfg.compression.is_some();
374 let mut blobs = Vec::with_capacity(self.blobs.len());
375 for (section, blob) in self.blobs.range(start_section..) {
376 let blob_size = blob.size().await;
377 let max_offset = compute_next_offset(blob_size)?;
378 blobs.push((
379 *section,
380 blob.clone(),
381 max_offset,
382 blob_size,
383 codec_config.clone(),
384 compressed,
385 ));
386 }
387
388 Ok(stream::iter(blobs).flat_map(
391 move |(section, blob, max_offset, blob_size, codec_config, compressed)| {
392 let mut reader = Read::new(blob, blob_size, buffer);
394 if section == start_section && offset != 0 {
395 if let Err(err) = reader.seek_to(offset as u64 * ITEM_ALIGNMENT) {
396 warn!(section, offset, ?err, "failed to seek to offset");
397 return stream::once(async move { Err(err.into()) }).left_stream();
399 }
400 } else {
401 offset = 0;
402 }
403
404 stream::unfold(
406 (section, reader, offset, 0u64, codec_config, compressed),
407 move |(
408 section,
409 mut reader,
410 offset,
411 valid_size,
412 codec_config,
413 compressed,
414 )| async move {
415 if offset >= max_offset {
417 return None;
418 }
419
420 match Self::read_buffered(
422 &mut reader,
423 offset,
424 &codec_config,
425 compressed,
426 )
427 .await
428 {
429 Ok((next_offset, next_valid_size, size, item)) => {
430 trace!(blob = section, cursor = offset, "replayed item");
431 Some((
432 Ok((section, offset, size, item)),
433 (
434 section,
435 reader,
436 next_offset,
437 next_valid_size,
438 codec_config,
439 compressed,
440 ),
441 ))
442 }
443 Err(Error::ChecksumMismatch(expected, found)) => {
444 warn!(
448 blob = section,
449 bad_offset = offset,
450 new_size = valid_size,
451 expected,
452 found,
453 "corruption detected: truncating"
454 );
455 reader.resize(valid_size).await.ok()?;
456 None
457 }
458 Err(Error::Runtime(RError::BlobInsufficientLength)) => {
459 warn!(
463 blob = section,
464 bad_offset = offset,
465 new_size = valid_size,
466 "trailing bytes detected: truncating"
467 );
468 reader.resize(valid_size).await.ok()?;
469 None
470 }
471 Err(err) => {
472 warn!(
475 blob = section,
476 cursor = offset,
477 ?err,
478 "unexpected error"
479 );
480 Some((
481 Err(err),
482 (
483 section,
484 reader,
485 offset,
486 valid_size,
487 codec_config,
488 compressed,
489 ),
490 ))
491 }
492 }
493 },
494 ).right_stream()
495 },
496 ))
497 }
498
499 pub async fn append(&mut self, section: u64, item: V) -> Result<(u32, u32), Error> {
511 self.prune_guard(section)?;
513
514 let encoded = item.encode();
516 let encoded = if let Some(compression) = self.cfg.compression {
517 compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?
518 } else {
519 encoded.into()
520 };
521
522 let item_len = encoded.len();
524 let item_len = match item_len.try_into() {
525 Ok(len) => len,
526 Err(_) => return Err(Error::ItemTooLarge(item_len)),
527 };
528 let size_len = UInt(item_len).encode_size();
529 let entry_len = size_len + item_len as usize + 4;
530
531 let blob = match self.blobs.entry(section) {
533 Entry::Occupied(entry) => entry.into_mut(),
534 Entry::Vacant(entry) => {
535 let name = section.to_be_bytes();
536 let (blob, size) = self.context.open(&self.cfg.partition, &name).await?;
537 let blob = Append::new(
538 blob,
539 size,
540 self.cfg.write_buffer,
541 self.cfg.buffer_pool.clone(),
542 )
543 .await?;
544 self.tracked.inc();
545 entry.insert(blob)
546 }
547 };
548
549 let cursor = blob.size().await;
551 let offset = compute_next_offset(cursor)?;
552 let aligned_cursor = offset as u64 * ITEM_ALIGNMENT;
553 let padding = (aligned_cursor - cursor) as usize;
554
555 let mut buf = Vec::with_capacity(padding + entry_len);
557
558 if padding > 0 {
560 buf.resize(padding, 0);
561 }
562
563 let entry_start = buf.len();
565 UInt(item_len).write(&mut buf);
566 buf.put_slice(&encoded);
567
568 let checksum = crc32fast::hash(&buf[entry_start..]);
570 buf.put_u32(checksum);
571 assert_eq!(buf[entry_start..].len(), entry_len);
572
573 blob.append(buf).await?;
575 trace!(blob = section, offset, "appended item");
576 Ok((offset, item_len))
577 }
578
579 pub async fn get(&self, section: u64, offset: u32) -> Result<V, Error> {
590 self.prune_guard(section)?;
591 let blob = match self.blobs.get(§ion) {
592 Some(blob) => blob,
593 None => return Err(Error::SectionOutOfRange(section)),
594 };
595
596 let (_, _, item) = Self::read(
598 self.cfg.compression.is_some(),
599 &self.cfg.codec_config,
600 blob,
601 offset,
602 )
603 .await?;
604 Ok(item)
605 }
606
607 pub async fn size(&self, section: u64) -> Result<u64, Error> {
611 self.prune_guard(section)?;
612 match self.blobs.get(§ion) {
613 Some(blob) => Ok(blob.size().await),
614 None => Ok(0),
615 }
616 }
617
618 pub async fn rewind_to_offset(&mut self, section: u64, offset: u32) -> Result<(), Error> {
626 self.rewind(section, offset as u64 * ITEM_ALIGNMENT).await
627 }
628
629 pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
639 self.prune_guard(section)?;
640
641 let trailing: Vec<u64> = self
643 .blobs
644 .range((
645 std::ops::Bound::Excluded(section),
646 std::ops::Bound::Unbounded,
647 ))
648 .map(|(§ion, _)| section)
649 .collect();
650 for index in trailing.iter().rev() {
651 let blob = self.blobs.remove(index).unwrap();
653
654 drop(blob);
656 self.context
657 .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
658 .await?;
659 debug!(section = index, "removed section");
660 self.tracked.dec();
661 }
662
663 let blob = match self.blobs.get_mut(§ion) {
665 Some(blob) => blob,
666 None => return Ok(()),
667 };
668 let current = blob.size().await;
669 if size >= current {
670 return Ok(()); }
672 blob.resize(size).await?;
673 debug!(
674 section,
675 from = current,
676 to = size,
677 ?trailing,
678 "rewound journal"
679 );
680 Ok(())
681 }
682
683 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
691 self.prune_guard(section)?;
692
693 let blob = match self.blobs.get_mut(§ion) {
695 Some(blob) => blob,
696 None => return Ok(()),
697 };
698
699 let current = blob.size().await;
701 if size >= current {
702 return Ok(()); }
704 blob.resize(size).await?;
705 debug!(section, from = current, to = size, "rewound section");
706 Ok(())
707 }
708
709 pub async fn sync(&self, section: u64) -> Result<(), Error> {
713 self.prune_guard(section)?;
714 let blob = match self.blobs.get(§ion) {
715 Some(blob) => blob,
716 None => return Ok(()),
717 };
718 self.synced.inc();
719 blob.sync().await.map_err(Error::Runtime)
720 }
721
722 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
724 let mut pruned = false;
726 while let Some((§ion, _)) = self.blobs.first_key_value() {
727 if section >= min {
729 break;
730 }
731
732 let blob = self.blobs.remove(§ion).unwrap();
734 let size = blob.size().await;
735 drop(blob);
736
737 self.context
739 .remove(&self.cfg.partition, Some(§ion.to_be_bytes()))
740 .await?;
741 pruned = true;
742
743 debug!(blob = section, size, "pruned blob");
744 self.tracked.dec();
745 self.pruned.inc();
746 }
747
748 if pruned {
749 self.oldest_retained_section = min;
750 }
751
752 Ok(pruned)
753 }
754
755 pub async fn close(self) -> Result<(), Error> {
757 for (section, blob) in self.blobs.into_iter() {
758 let size = blob.size().await;
759 blob.sync().await?;
760 debug!(blob = section, size, "synced blob");
761 }
762 Ok(())
763 }
764
765 pub fn oldest_section(&self) -> Option<u64> {
767 self.blobs.first_key_value().map(|(section, _)| *section)
768 }
769
770 pub async fn destroy(self) -> Result<(), Error> {
772 for (i, blob) in self.blobs.into_iter() {
773 let size = blob.size().await;
774 drop(blob);
775 debug!(blob = i, size, "destroyed blob");
776 self.context
777 .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
778 .await?;
779 }
780 match self.context.remove(&self.cfg.partition, None).await {
781 Ok(()) => {}
782 Err(RError::PartitionMissing(_)) => {
783 }
785 Err(err) => return Err(Error::Runtime(err)),
786 }
787 Ok(())
788 }
789}
790
791#[cfg(test)]
792mod tests {
793 use super::*;
794 use bytes::BufMut;
795 use commonware_cryptography::{Hasher, Sha256};
796 use commonware_macros::test_traced;
797 use commonware_runtime::{deterministic, Blob, Error as RError, Runner, Storage};
798 use commonware_utils::{NZUsize, StableBuf};
799 use futures::{pin_mut, StreamExt};
800 use prometheus_client::registry::Metric;
801
802 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
803 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
804
805 #[test_traced]
806 fn test_journal_append_and_read() {
807 let executor = deterministic::Runner::default();
809
810 executor.start(|context| async move {
812 let cfg = Config {
814 partition: "test_partition".into(),
815 compression: None,
816 codec_config: (),
817 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
818 write_buffer: NZUsize!(1024),
819 };
820 let index = 1u64;
821 let data = 10;
822 let mut journal = Journal::init(context.clone(), cfg.clone())
823 .await
824 .expect("Failed to initialize journal");
825
826 journal
828 .append(index, data)
829 .await
830 .expect("Failed to append data");
831
832 let buffer = context.encode();
834 assert!(buffer.contains("tracked 1"));
835
836 journal.close().await.expect("Failed to close journal");
838
839 let cfg = Config {
841 partition: "test_partition".into(),
842 compression: None,
843 codec_config: (),
844 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
845 write_buffer: NZUsize!(1024),
846 };
847 let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
848 .await
849 .expect("Failed to re-initialize journal");
850
851 let mut items = Vec::new();
853 let stream = journal
854 .replay(0, 0, NZUsize!(1024))
855 .await
856 .expect("unable to setup replay");
857 pin_mut!(stream);
858 while let Some(result) = stream.next().await {
859 match result {
860 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
861 Err(err) => panic!("Failed to read item: {err}"),
862 }
863 }
864
865 assert_eq!(items.len(), 1);
867 assert_eq!(items[0].0, index);
868 assert_eq!(items[0].1, data);
869
870 let buffer = context.encode();
872 assert!(buffer.contains("tracked 1"));
873 });
874 }
875
876 #[test_traced]
877 fn test_journal_multiple_appends_and_reads() {
878 let executor = deterministic::Runner::default();
880
881 executor.start(|context| async move {
883 let cfg = Config {
885 partition: "test_partition".into(),
886 compression: None,
887 codec_config: (),
888 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
889 write_buffer: NZUsize!(1024),
890 };
891
892 let mut journal = Journal::init(context.clone(), cfg.clone())
894 .await
895 .expect("Failed to initialize journal");
896
897 let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
899 for (index, data) in &data_items {
900 journal
901 .append(*index, *data)
902 .await
903 .expect("Failed to append data");
904 journal.sync(*index).await.expect("Failed to sync blob");
905 }
906
907 let buffer = context.encode();
909 assert!(buffer.contains("tracked 3"));
910 assert!(buffer.contains("synced_total 4"));
911
912 journal.close().await.expect("Failed to close journal");
914
915 let journal = Journal::init(context, cfg)
917 .await
918 .expect("Failed to re-initialize journal");
919
920 let mut items = Vec::<(u64, u32)>::new();
922 {
923 let stream = journal
924 .replay(0, 0, NZUsize!(1024))
925 .await
926 .expect("unable to setup replay");
927 pin_mut!(stream);
928 while let Some(result) = stream.next().await {
929 match result {
930 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
931 Err(err) => panic!("Failed to read item: {err}"),
932 }
933 }
934 }
935
936 assert_eq!(items.len(), data_items.len());
938 for ((expected_index, expected_data), (actual_index, actual_data)) in
939 data_items.iter().zip(items.iter())
940 {
941 assert_eq!(actual_index, expected_index);
942 assert_eq!(actual_data, expected_data);
943 }
944
945 journal.destroy().await.expect("Failed to destroy journal");
947 });
948 }
949
950 #[test_traced]
951 fn test_journal_prune_blobs() {
952 let executor = deterministic::Runner::default();
954
955 executor.start(|context| async move {
957 let cfg = Config {
959 partition: "test_partition".into(),
960 compression: None,
961 codec_config: (),
962 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
963 write_buffer: NZUsize!(1024),
964 };
965
966 let mut journal = Journal::init(context.clone(), cfg.clone())
968 .await
969 .expect("Failed to initialize journal");
970
971 for index in 1u64..=5u64 {
973 journal
974 .append(index, index)
975 .await
976 .expect("Failed to append data");
977 journal.sync(index).await.expect("Failed to sync blob");
978 }
979
980 let data = 99;
982 journal
983 .append(2u64, data)
984 .await
985 .expect("Failed to append data");
986 journal.sync(2u64).await.expect("Failed to sync blob");
987
988 journal.prune(3).await.expect("Failed to prune blobs");
990
991 let buffer = context.encode();
993 assert!(buffer.contains("pruned_total 2"));
994
995 journal.prune(2).await.expect("Failed to no-op prune");
997 let buffer = context.encode();
998 assert!(buffer.contains("pruned_total 2"));
999
1000 journal.close().await.expect("Failed to close journal");
1002
1003 let mut journal = Journal::init(context.clone(), cfg.clone())
1005 .await
1006 .expect("Failed to re-initialize journal");
1007
1008 let mut items = Vec::<(u64, u64)>::new();
1010 {
1011 let stream = journal
1012 .replay(0, 0, NZUsize!(1024))
1013 .await
1014 .expect("unable to setup replay");
1015 pin_mut!(stream);
1016 while let Some(result) = stream.next().await {
1017 match result {
1018 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1019 Err(err) => panic!("Failed to read item: {err}"),
1020 }
1021 }
1022 }
1023
1024 assert_eq!(items.len(), 3);
1026 let expected_indices = [3u64, 4u64, 5u64];
1027 for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
1028 assert_eq!(item.0, *expected_index);
1029 }
1030
1031 journal.prune(6).await.expect("Failed to prune blobs");
1033
1034 journal.close().await.expect("Failed to close journal");
1036
1037 assert!(context
1042 .scan(&cfg.partition)
1043 .await
1044 .expect("Failed to list blobs")
1045 .is_empty());
1046 });
1047 }
1048
1049 #[test_traced]
1050 fn test_journal_prune_guard() {
1051 let executor = deterministic::Runner::default();
1052
1053 executor.start(|context| async move {
1054 let cfg = Config {
1055 partition: "test_partition".into(),
1056 compression: None,
1057 codec_config: (),
1058 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1059 write_buffer: NZUsize!(1024),
1060 };
1061
1062 let mut journal = Journal::init(context.clone(), cfg.clone())
1063 .await
1064 .expect("Failed to initialize journal");
1065
1066 for section in 1u64..=5u64 {
1068 journal
1069 .append(section, section as i32)
1070 .await
1071 .expect("Failed to append data");
1072 journal.sync(section).await.expect("Failed to sync");
1073 }
1074
1075 assert_eq!(journal.oldest_retained_section, 0);
1077
1078 journal.prune(3).await.expect("Failed to prune");
1080
1081 assert_eq!(journal.oldest_retained_section, 3);
1083
1084 match journal.append(1, 100).await {
1088 Err(Error::AlreadyPrunedToSection(3)) => {}
1089 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1090 }
1091
1092 match journal.append(2, 100).await {
1093 Err(Error::AlreadyPrunedToSection(3)) => {}
1094 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1095 }
1096
1097 match journal.get(1, 0).await {
1099 Err(Error::AlreadyPrunedToSection(3)) => {}
1100 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1101 }
1102
1103 match journal.size(1).await {
1105 Err(Error::AlreadyPrunedToSection(3)) => {}
1106 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1107 }
1108
1109 match journal.rewind(2, 0).await {
1111 Err(Error::AlreadyPrunedToSection(3)) => {}
1112 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1113 }
1114
1115 match journal.rewind_section(1, 0).await {
1117 Err(Error::AlreadyPrunedToSection(3)) => {}
1118 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1119 }
1120
1121 match journal.sync(2).await {
1123 Err(Error::AlreadyPrunedToSection(3)) => {}
1124 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1125 }
1126
1127 assert!(journal.get(3, 0).await.is_ok());
1129 assert!(journal.get(4, 0).await.is_ok());
1130 assert!(journal.get(5, 0).await.is_ok());
1131 assert!(journal.size(3).await.is_ok());
1132 assert!(journal.sync(4).await.is_ok());
1133
1134 journal
1136 .append(3, 999)
1137 .await
1138 .expect("Should be able to append to section 3");
1139
1140 journal.prune(5).await.expect("Failed to prune");
1142 assert_eq!(journal.oldest_retained_section, 5);
1143
1144 match journal.get(3, 0).await {
1146 Err(Error::AlreadyPrunedToSection(5)) => {}
1147 other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1148 }
1149
1150 match journal.get(4, 0).await {
1151 Err(Error::AlreadyPrunedToSection(5)) => {}
1152 other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1153 }
1154
1155 assert!(journal.get(5, 0).await.is_ok());
1157
1158 journal.close().await.expect("Failed to close journal");
1159 });
1160 }
1161
1162 #[test_traced]
1163 fn test_journal_prune_guard_across_restart() {
1164 let executor = deterministic::Runner::default();
1165
1166 executor.start(|context| async move {
1167 let cfg = Config {
1168 partition: "test_partition".into(),
1169 compression: None,
1170 codec_config: (),
1171 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1172 write_buffer: NZUsize!(1024),
1173 };
1174
1175 {
1177 let mut journal = Journal::init(context.clone(), cfg.clone())
1178 .await
1179 .expect("Failed to initialize journal");
1180
1181 for section in 1u64..=5u64 {
1182 journal
1183 .append(section, section as i32)
1184 .await
1185 .expect("Failed to append data");
1186 journal.sync(section).await.expect("Failed to sync");
1187 }
1188
1189 journal.prune(3).await.expect("Failed to prune");
1190 assert_eq!(journal.oldest_retained_section, 3);
1191
1192 journal.close().await.expect("Failed to close journal");
1193 }
1194
1195 {
1197 let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
1198 .await
1199 .expect("Failed to re-initialize journal");
1200
1201 assert_eq!(journal.oldest_retained_section, 0);
1204
1205 match journal.get(1, 0).await {
1208 Err(Error::SectionOutOfRange(1)) => {}
1209 other => panic!("Expected SectionOutOfRange(1), got {other:?}"),
1210 }
1211
1212 match journal.get(2, 0).await {
1213 Err(Error::SectionOutOfRange(2)) => {}
1214 other => panic!("Expected SectionOutOfRange(2), got {other:?}"),
1215 }
1216
1217 assert!(journal.get(3, 0).await.is_ok());
1219 assert!(journal.get(4, 0).await.is_ok());
1220 assert!(journal.get(5, 0).await.is_ok());
1221
1222 journal.close().await.expect("Failed to close journal");
1223 }
1224 });
1225 }
1226
1227 #[test_traced]
1228 fn test_journal_with_invalid_blob_name() {
1229 let executor = deterministic::Runner::default();
1231
1232 executor.start(|context| async move {
1234 let cfg = Config {
1236 partition: "test_partition".into(),
1237 compression: None,
1238 codec_config: (),
1239 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1240 write_buffer: NZUsize!(1024),
1241 };
1242
1243 let invalid_blob_name = b"invalid"; let (blob, _) = context
1246 .open(&cfg.partition, invalid_blob_name)
1247 .await
1248 .expect("Failed to create blob with invalid name");
1249 blob.sync().await.expect("Failed to sync blob");
1250
1251 let result = Journal::<_, u64>::init(context, cfg).await;
1253
1254 assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1256 });
1257 }
1258
1259 #[test_traced]
1260 fn test_journal_read_size_missing() {
1261 let executor = deterministic::Runner::default();
1263
1264 executor.start(|context| async move {
1266 let cfg = Config {
1268 partition: "test_partition".into(),
1269 compression: None,
1270 codec_config: (),
1271 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1272 write_buffer: NZUsize!(1024),
1273 };
1274
1275 let section = 1u64;
1277 let blob_name = section.to_be_bytes();
1278 let (blob, _) = context
1279 .open(&cfg.partition, &blob_name)
1280 .await
1281 .expect("Failed to create blob");
1282
1283 let mut incomplete_data = Vec::new();
1285 UInt(u32::MAX).write(&mut incomplete_data);
1286 incomplete_data.truncate(1);
1287 blob.write_at(incomplete_data, 0)
1288 .await
1289 .expect("Failed to write incomplete data");
1290 blob.sync().await.expect("Failed to sync blob");
1291
1292 let journal = Journal::init(context, cfg)
1294 .await
1295 .expect("Failed to initialize journal");
1296
1297 let stream = journal
1299 .replay(0, 0, NZUsize!(1024))
1300 .await
1301 .expect("unable to setup replay");
1302 pin_mut!(stream);
1303 let mut items = Vec::<(u64, u64)>::new();
1304 while let Some(result) = stream.next().await {
1305 match result {
1306 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1307 Err(err) => panic!("Failed to read item: {err}"),
1308 }
1309 }
1310 assert!(items.is_empty());
1311 });
1312 }
1313
1314 #[test_traced]
1315 fn test_journal_read_item_missing() {
1316 let executor = deterministic::Runner::default();
1318
1319 executor.start(|context| async move {
1321 let cfg = Config {
1323 partition: "test_partition".into(),
1324 compression: None,
1325 codec_config: (),
1326 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1327 write_buffer: NZUsize!(1024),
1328 };
1329
1330 let section = 1u64;
1332 let blob_name = section.to_be_bytes();
1333 let (blob, _) = context
1334 .open(&cfg.partition, &blob_name)
1335 .await
1336 .expect("Failed to create blob");
1337
1338 let item_size: u32 = 10; let mut buf = Vec::new();
1341 UInt(item_size).write(&mut buf); let data = [2u8; 5]; BufMut::put_slice(&mut buf, &data);
1344 blob.write_at(buf, 0)
1345 .await
1346 .expect("Failed to write incomplete item");
1347 blob.sync().await.expect("Failed to sync blob");
1348
1349 let journal = Journal::init(context, cfg)
1351 .await
1352 .expect("Failed to initialize journal");
1353
1354 let stream = journal
1356 .replay(0, 0, NZUsize!(1024))
1357 .await
1358 .expect("unable to setup replay");
1359 pin_mut!(stream);
1360 let mut items = Vec::<(u64, u64)>::new();
1361 while let Some(result) = stream.next().await {
1362 match result {
1363 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1364 Err(err) => panic!("Failed to read item: {err}"),
1365 }
1366 }
1367 assert!(items.is_empty());
1368 });
1369 }
1370
1371 #[test_traced]
1372 fn test_journal_read_checksum_missing() {
1373 let executor = deterministic::Runner::default();
1375
1376 executor.start(|context| async move {
1378 let cfg = Config {
1380 partition: "test_partition".into(),
1381 compression: None,
1382 codec_config: (),
1383 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1384 write_buffer: NZUsize!(1024),
1385 };
1386
1387 let section = 1u64;
1389 let blob_name = section.to_be_bytes();
1390 let (blob, _) = context
1391 .open(&cfg.partition, &blob_name)
1392 .await
1393 .expect("Failed to create blob");
1394
1395 let item_data = b"Test data";
1397 let item_size = item_data.len() as u32;
1398
1399 let mut buf = Vec::new();
1401 UInt(item_size).write(&mut buf);
1402 BufMut::put_slice(&mut buf, item_data);
1403 blob.write_at(buf, 0)
1404 .await
1405 .expect("Failed to write item without checksum");
1406
1407 blob.sync().await.expect("Failed to sync blob");
1408
1409 let journal = Journal::init(context, cfg)
1411 .await
1412 .expect("Failed to initialize journal");
1413
1414 let stream = journal
1418 .replay(0, 0, NZUsize!(1024))
1419 .await
1420 .expect("unable to setup replay");
1421 pin_mut!(stream);
1422 let mut items = Vec::<(u64, u64)>::new();
1423 while let Some(result) = stream.next().await {
1424 match result {
1425 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1426 Err(err) => panic!("Failed to read item: {err}"),
1427 }
1428 }
1429 assert!(items.is_empty());
1430 });
1431 }
1432
1433 #[test_traced]
1434 fn test_journal_read_checksum_mismatch() {
1435 let executor = deterministic::Runner::default();
1437
1438 executor.start(|context| async move {
1440 let cfg = Config {
1442 partition: "test_partition".into(),
1443 compression: None,
1444 codec_config: (),
1445 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1446 write_buffer: NZUsize!(1024),
1447 };
1448
1449 let section = 1u64;
1451 let blob_name = section.to_be_bytes();
1452 let (blob, _) = context
1453 .open(&cfg.partition, &blob_name)
1454 .await
1455 .expect("Failed to create blob");
1456
1457 let item_data = b"Test data";
1459 let item_size = item_data.len() as u32;
1460 let incorrect_checksum: u32 = 0xDEADBEEF;
1461
1462 let mut buf = Vec::new();
1464 UInt(item_size).write(&mut buf);
1465 BufMut::put_slice(&mut buf, item_data);
1466 buf.put_u32(incorrect_checksum);
1467 blob.write_at(buf, 0)
1468 .await
1469 .expect("Failed to write item with bad checksum");
1470
1471 blob.sync().await.expect("Failed to sync blob");
1472
1473 let journal = Journal::init(context.clone(), cfg.clone())
1475 .await
1476 .expect("Failed to initialize journal");
1477
1478 {
1480 let stream = journal
1481 .replay(0, 0, NZUsize!(1024))
1482 .await
1483 .expect("unable to setup replay");
1484 pin_mut!(stream);
1485 let mut items = Vec::<(u64, u64)>::new();
1486 while let Some(result) = stream.next().await {
1487 match result {
1488 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1489 Err(err) => panic!("Failed to read item: {err}"),
1490 }
1491 }
1492 assert!(items.is_empty());
1493 }
1494 journal.close().await.expect("Failed to close journal");
1495
1496 let (_, blob_size) = context
1498 .open(&cfg.partition, §ion.to_be_bytes())
1499 .await
1500 .expect("Failed to open blob");
1501 assert_eq!(blob_size, 0);
1502 });
1503 }
1504
1505 #[test_traced]
1506 fn test_journal_handling_unaligned_truncated_data() {
1507 let executor = deterministic::Runner::default();
1509
1510 executor.start(|context| async move {
1512 let cfg = Config {
1514 partition: "test_partition".into(),
1515 compression: None,
1516 codec_config: (),
1517 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1518 write_buffer: NZUsize!(1024),
1519 };
1520
1521 let mut journal = Journal::init(context.clone(), cfg.clone())
1523 .await
1524 .expect("Failed to initialize journal");
1525
1526 journal.append(1, 1).await.expect("Failed to append data");
1528
1529 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1531 for (index, data) in &data_items {
1532 journal
1533 .append(*index, *data)
1534 .await
1535 .expect("Failed to append data");
1536 journal.sync(*index).await.expect("Failed to sync blob");
1537 }
1538
1539 journal.close().await.expect("Failed to close journal");
1541
1542 let (blob, blob_size) = context
1544 .open(&cfg.partition, &2u64.to_be_bytes())
1545 .await
1546 .expect("Failed to open blob");
1547 blob.resize(blob_size - 4)
1548 .await
1549 .expect("Failed to corrupt blob");
1550 blob.sync().await.expect("Failed to sync blob");
1551
1552 let journal = Journal::init(context.clone(), cfg.clone())
1554 .await
1555 .expect("Failed to re-initialize journal");
1556
1557 let mut items = Vec::<(u64, u32)>::new();
1559 {
1560 let stream = journal
1561 .replay(0, 0, NZUsize!(1024))
1562 .await
1563 .expect("unable to setup replay");
1564 pin_mut!(stream);
1565 while let Some(result) = stream.next().await {
1566 match result {
1567 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1568 Err(err) => panic!("Failed to read item: {err}"),
1569 }
1570 }
1571 }
1572 journal.close().await.expect("Failed to close journal");
1573
1574 assert_eq!(items.len(), 3);
1576 assert_eq!(items[0].0, 1);
1577 assert_eq!(items[0].1, 1);
1578 assert_eq!(items[1].0, data_items[0].0);
1579 assert_eq!(items[1].1, data_items[0].1);
1580 assert_eq!(items[2].0, data_items[1].0);
1581 assert_eq!(items[2].1, data_items[1].1);
1582
1583 let (_, blob_size) = context
1587 .open(&cfg.partition, &2u64.to_be_bytes())
1588 .await
1589 .expect("Failed to open blob");
1590 assert_eq!(blob_size, 25);
1591
1592 let mut journal = Journal::init(context.clone(), cfg.clone())
1594 .await
1595 .expect("Failed to re-initialize journal");
1596
1597 let mut items = Vec::<(u64, u32)>::new();
1599 {
1600 let stream = journal
1601 .replay(0, 0, NZUsize!(1024))
1602 .await
1603 .expect("unable to setup replay");
1604 pin_mut!(stream);
1605 while let Some(result) = stream.next().await {
1606 match result {
1607 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1608 Err(err) => panic!("Failed to read item: {err}"),
1609 }
1610 }
1611 }
1612
1613 assert_eq!(items.len(), 3);
1615 assert_eq!(items[0].0, 1);
1616 assert_eq!(items[0].1, 1);
1617 assert_eq!(items[1].0, data_items[0].0);
1618 assert_eq!(items[1].1, data_items[0].1);
1619 assert_eq!(items[2].0, data_items[1].0);
1620 assert_eq!(items[2].1, data_items[1].1);
1621
1622 journal.append(2, 5).await.expect("Failed to append data");
1624 journal.sync(2).await.expect("Failed to sync blob");
1625
1626 let item = journal.get(2, 2).await.expect("Failed to get item");
1628 assert_eq!(item, 5);
1629
1630 journal.close().await.expect("Failed to close journal");
1632
1633 let (_, blob_size) = context
1637 .open(&cfg.partition, &2u64.to_be_bytes())
1638 .await
1639 .expect("Failed to open blob");
1640 assert_eq!(blob_size, 41);
1641
1642 let journal = Journal::init(context.clone(), cfg.clone())
1644 .await
1645 .expect("Failed to re-initialize journal");
1646
1647 let mut items = Vec::<(u64, u32)>::new();
1649 {
1650 let stream = journal
1651 .replay(0, 0, NZUsize!(1024))
1652 .await
1653 .expect("unable to setup replay");
1654 pin_mut!(stream);
1655 while let Some(result) = stream.next().await {
1656 match result {
1657 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1658 Err(err) => panic!("Failed to read item: {err}"),
1659 }
1660 }
1661 }
1662
1663 assert_eq!(items.len(), 4);
1665 assert_eq!(items[0].0, 1);
1666 assert_eq!(items[0].1, 1);
1667 assert_eq!(items[1].0, data_items[0].0);
1668 assert_eq!(items[1].1, data_items[0].1);
1669 assert_eq!(items[2].0, data_items[1].0);
1670 assert_eq!(items[2].1, data_items[1].1);
1671 assert_eq!(items[3].0, 2);
1672 assert_eq!(items[3].1, 5);
1673 });
1674 }
1675
1676 #[test_traced]
1677 fn test_journal_handling_aligned_truncated_data() {
1678 let executor = deterministic::Runner::default();
1680
1681 executor.start(|context| async move {
1683 let cfg = Config {
1685 partition: "test_partition".into(),
1686 compression: None,
1687 codec_config: (),
1688 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1689 write_buffer: NZUsize!(1024),
1690 };
1691
1692 let mut journal = Journal::init(context.clone(), cfg.clone())
1694 .await
1695 .expect("Failed to initialize journal");
1696
1697 journal.append(1, 1).await.expect("Failed to append data");
1699
1700 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1702 for (index, data) in &data_items {
1703 journal
1704 .append(*index, *data)
1705 .await
1706 .expect("Failed to append data");
1707 journal.sync(*index).await.expect("Failed to sync blob");
1708 }
1709
1710 journal.close().await.expect("Failed to close journal");
1712
1713 let (blob, blob_size) = context
1715 .open(&cfg.partition, &2u64.to_be_bytes())
1716 .await
1717 .expect("Failed to open blob");
1718 blob.resize(blob_size - 4)
1719 .await
1720 .expect("Failed to corrupt blob");
1721 blob.sync().await.expect("Failed to sync blob");
1722
1723 let mut journal = Journal::init(context.clone(), cfg.clone())
1725 .await
1726 .expect("Failed to re-initialize journal");
1727
1728 let mut items = Vec::<(u64, u64)>::new();
1730 {
1731 let stream = journal
1732 .replay(0, 0, NZUsize!(1024))
1733 .await
1734 .expect("unable to setup replay");
1735 pin_mut!(stream);
1736 while let Some(result) = stream.next().await {
1737 match result {
1738 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1739 Err(err) => panic!("Failed to read item: {err}"),
1740 }
1741 }
1742 }
1743
1744 assert_eq!(items.len(), 3);
1746 assert_eq!(items[0].0, 1);
1747 assert_eq!(items[0].1, 1);
1748 assert_eq!(items[1].0, data_items[0].0);
1749 assert_eq!(items[1].1, data_items[0].1);
1750 assert_eq!(items[2].0, data_items[1].0);
1751 assert_eq!(items[2].1, data_items[1].1);
1752
1753 journal.append(2, 5).await.expect("Failed to append data");
1755 journal.sync(2).await.expect("Failed to sync blob");
1756
1757 let item = journal.get(2, 2).await.expect("Failed to get item");
1759 assert_eq!(item, 5);
1760
1761 journal.close().await.expect("Failed to close journal");
1763
1764 let (_, blob_size) = context
1768 .open(&cfg.partition, &2u64.to_be_bytes())
1769 .await
1770 .expect("Failed to open blob");
1771 assert_eq!(blob_size, 45);
1772
1773 let journal = Journal::init(context, cfg)
1775 .await
1776 .expect("Failed to re-initialize journal");
1777
1778 let mut items = Vec::<(u64, u64)>::new();
1780 {
1781 let stream = journal
1782 .replay(0, 0, NZUsize!(1024))
1783 .await
1784 .expect("unable to setup replay");
1785 pin_mut!(stream);
1786 while let Some(result) = stream.next().await {
1787 match result {
1788 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1789 Err(err) => panic!("Failed to read item: {err}"),
1790 }
1791 }
1792 }
1793 journal.close().await.expect("Failed to close journal");
1794
1795 assert_eq!(items.len(), 4);
1797 assert_eq!(items[0].0, 1);
1798 assert_eq!(items[0].1, 1);
1799 assert_eq!(items[1].0, data_items[0].0);
1800 assert_eq!(items[1].1, data_items[0].1);
1801 assert_eq!(items[2].0, data_items[1].0);
1802 assert_eq!(items[2].1, data_items[1].1);
1803 assert_eq!(items[3].0, 2);
1804 assert_eq!(items[3].1, 5);
1805 });
1806 }
1807
1808 #[test_traced]
1809 fn test_journal_handling_extra_data() {
1810 let executor = deterministic::Runner::default();
1812
1813 executor.start(|context| async move {
1815 let cfg = Config {
1817 partition: "test_partition".into(),
1818 compression: None,
1819 codec_config: (),
1820 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1821 write_buffer: NZUsize!(1024),
1822 };
1823
1824 let mut journal = Journal::init(context.clone(), cfg.clone())
1826 .await
1827 .expect("Failed to initialize journal");
1828
1829 journal.append(1, 1).await.expect("Failed to append data");
1831
1832 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1834 for (index, data) in &data_items {
1835 journal
1836 .append(*index, *data)
1837 .await
1838 .expect("Failed to append data");
1839 journal.sync(*index).await.expect("Failed to sync blob");
1840 }
1841
1842 journal.close().await.expect("Failed to close journal");
1844
1845 let (blob, blob_size) = context
1847 .open(&cfg.partition, &2u64.to_be_bytes())
1848 .await
1849 .expect("Failed to open blob");
1850 blob.write_at(vec![0u8; 16], blob_size)
1851 .await
1852 .expect("Failed to add extra data");
1853 blob.sync().await.expect("Failed to sync blob");
1854
1855 let journal = Journal::init(context, cfg)
1857 .await
1858 .expect("Failed to re-initialize journal");
1859
1860 let mut items = Vec::<(u64, i32)>::new();
1862 let stream = journal
1863 .replay(0, 0, NZUsize!(1024))
1864 .await
1865 .expect("unable to setup replay");
1866 pin_mut!(stream);
1867 while let Some(result) = stream.next().await {
1868 match result {
1869 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1870 Err(err) => panic!("Failed to read item: {err}"),
1871 }
1872 }
1873 });
1874 }
1875
1876 #[derive(Clone)]
1878 struct MockBlob {}
1879
1880 impl Blob for MockBlob {
1881 async fn read_at(
1882 &self,
1883 buf: impl Into<StableBuf> + Send,
1884 _offset: u64,
1885 ) -> Result<StableBuf, RError> {
1886 Ok(buf.into())
1887 }
1888
1889 async fn write_at(
1890 &self,
1891 _buf: impl Into<StableBuf> + Send,
1892 _offset: u64,
1893 ) -> Result<(), RError> {
1894 Ok(())
1895 }
1896
1897 async fn resize(&self, _len: u64) -> Result<(), RError> {
1898 Ok(())
1899 }
1900
1901 async fn sync(&self) -> Result<(), RError> {
1902 Ok(())
1903 }
1904 }
1905
1906 #[derive(Clone)]
1908 struct MockStorage {
1909 len: u64,
1910 }
1911
1912 impl Storage for MockStorage {
1913 type Blob = MockBlob;
1914
1915 async fn open(&self, _partition: &str, _name: &[u8]) -> Result<(MockBlob, u64), RError> {
1916 Ok((MockBlob {}, self.len))
1917 }
1918
1919 async fn remove(&self, _partition: &str, _name: Option<&[u8]>) -> Result<(), RError> {
1920 Ok(())
1921 }
1922
1923 async fn scan(&self, _partition: &str) -> Result<Vec<Vec<u8>>, RError> {
1924 Ok(vec![])
1925 }
1926 }
1927
1928 impl Metrics for MockStorage {
1929 fn with_label(&self, _: &str) -> Self {
1930 self.clone()
1931 }
1932
1933 fn label(&self) -> String {
1934 String::new()
1935 }
1936
1937 fn register<N: Into<String>, H: Into<String>>(&self, _: N, _: H, _: impl Metric) {}
1938
1939 fn encode(&self) -> String {
1940 String::new()
1941 }
1942 }
1943
1944 const INDEX_ALIGNMENT: u64 = 16;
1947
1948 #[test_traced]
1949 fn test_journal_large_offset() {
1950 let executor = deterministic::Runner::default();
1952 executor.start(|_| async move {
1953 let cfg = Config {
1955 partition: "partition".to_string(),
1956 compression: None,
1957 codec_config: (),
1958 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1959 write_buffer: NZUsize!(1024),
1960 };
1961 let context = MockStorage {
1962 len: u32::MAX as u64 * INDEX_ALIGNMENT, };
1964 let mut journal = Journal::init(context, cfg).await.unwrap();
1965
1966 let data = 1;
1968 let (result, _) = journal
1969 .append(1, data)
1970 .await
1971 .expect("Failed to append data");
1972 assert_eq!(result, u32::MAX);
1973 });
1974 }
1975
1976 #[test_traced]
1977 fn test_journal_offset_overflow() {
1978 let executor = deterministic::Runner::default();
1980 executor.start(|_| async move {
1981 let cfg = Config {
1983 partition: "partition".to_string(),
1984 compression: None,
1985 codec_config: (),
1986 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1987 write_buffer: NZUsize!(1024),
1988 };
1989 let context = MockStorage {
1990 len: u32::MAX as u64 * INDEX_ALIGNMENT + 1,
1991 };
1992 let mut journal = Journal::init(context, cfg).await.unwrap();
1993
1994 let data = 1;
1996 let result = journal.append(1, data).await;
1997 assert!(matches!(result, Err(Error::OffsetOverflow)));
1998 });
1999 }
2000
2001 #[test_traced]
2002 fn test_journal_rewind() {
2003 let executor = deterministic::Runner::default();
2005 executor.start(|context| async move {
2006 let cfg = Config {
2008 partition: "test_partition".to_string(),
2009 compression: None,
2010 codec_config: (),
2011 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2012 write_buffer: NZUsize!(1024),
2013 };
2014 let mut journal = Journal::init(context, cfg).await.unwrap();
2015
2016 let size = journal.size(1).await.unwrap();
2018 assert_eq!(size, 0);
2019
2020 journal.append(1, 42i32).await.unwrap();
2022
2023 let size = journal.size(1).await.unwrap();
2025 assert!(size > 0);
2026
2027 journal.append(1, 43i32).await.unwrap();
2029 let new_size = journal.size(1).await.unwrap();
2030 assert!(new_size > size);
2031
2032 let size = journal.size(2).await.unwrap();
2034 assert_eq!(size, 0);
2035
2036 journal.append(2, 44i32).await.unwrap();
2038
2039 let size = journal.size(2).await.unwrap();
2041 assert!(size > 0);
2042
2043 journal.rewind(1, 0).await.unwrap();
2045
2046 let size = journal.size(1).await.unwrap();
2048 assert_eq!(size, 0);
2049
2050 let size = journal.size(2).await.unwrap();
2052 assert_eq!(size, 0);
2053 });
2054 }
2055
2056 #[test_traced]
2057 fn test_journal_rewind_section() {
2058 let executor = deterministic::Runner::default();
2060 executor.start(|context| async move {
2061 let cfg = Config {
2063 partition: "test_partition".to_string(),
2064 compression: None,
2065 codec_config: (),
2066 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2067 write_buffer: NZUsize!(1024),
2068 };
2069 let mut journal = Journal::init(context, cfg).await.unwrap();
2070
2071 let size = journal.size(1).await.unwrap();
2073 assert_eq!(size, 0);
2074
2075 journal.append(1, 42i32).await.unwrap();
2077
2078 let size = journal.size(1).await.unwrap();
2080 assert!(size > 0);
2081
2082 journal.append(1, 43i32).await.unwrap();
2084 let new_size = journal.size(1).await.unwrap();
2085 assert!(new_size > size);
2086
2087 let size = journal.size(2).await.unwrap();
2089 assert_eq!(size, 0);
2090
2091 journal.append(2, 44i32).await.unwrap();
2093
2094 let size = journal.size(2).await.unwrap();
2096 assert!(size > 0);
2097
2098 journal.rewind_section(1, 0).await.unwrap();
2100
2101 let size = journal.size(1).await.unwrap();
2103 assert_eq!(size, 0);
2104
2105 let size = journal.size(2).await.unwrap();
2107 assert!(size > 0);
2108 });
2109 }
2110
2111 #[test_traced]
2113 fn test_journal_conformance() {
2114 let executor = deterministic::Runner::default();
2116
2117 executor.start(|context| async move {
2119 let cfg = Config {
2121 partition: "test_partition".into(),
2122 compression: None,
2123 codec_config: (),
2124 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2125 write_buffer: NZUsize!(1024),
2126 };
2127
2128 let mut journal = Journal::init(context.clone(), cfg.clone())
2130 .await
2131 .expect("Failed to initialize journal");
2132
2133 for i in 0..100 {
2135 journal.append(1, i).await.expect("Failed to append data");
2136 }
2137 journal.sync(1).await.expect("Failed to sync blob");
2138
2139 journal.close().await.expect("Failed to close journal");
2141
2142 let (blob, size) = context
2144 .open(&cfg.partition, &1u64.to_be_bytes())
2145 .await
2146 .expect("Failed to open blob");
2147 assert!(size > 0);
2148 let buf = blob
2149 .read_at(vec![0u8; size as usize], 0)
2150 .await
2151 .expect("Failed to read blob");
2152 let digest = Sha256::hash(buf.as_ref());
2153 assert_eq!(
2154 hex(&digest),
2155 "f55bf27a59118603466fcf6a507ab012eea4cb2d6bdd06ce8f515513729af847",
2156 );
2157 });
2158 }
2159}