1use crate::journal::Error;
104use bytes::BufMut;
105use commonware_codec::Codec;
106use commonware_runtime::{
107 buffer::{Append, PoolRef, Read},
108 Blob, Error as RError, Metrics, Storage,
109};
110use commonware_utils::hex;
111use futures::stream::{self, Stream, StreamExt};
112use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
113use std::{
114 collections::{btree_map::Entry, BTreeMap},
115 io::Cursor,
116 marker::PhantomData,
117 num::NonZeroUsize,
118};
119use tracing::{debug, trace, warn};
120use zstd::{bulk::compress, decode_all};
121
122#[derive(Clone)]
124pub struct Config<C> {
125 pub partition: String,
128
129 pub compression: Option<u8>,
131
132 pub codec_config: C,
134
135 pub buffer_pool: PoolRef,
137
138 pub write_buffer: NonZeroUsize,
140}
141
142pub(crate) const ITEM_ALIGNMENT: u64 = 16;
143
144#[inline]
147fn compute_next_offset(mut offset: u64) -> Result<u32, Error> {
148 let overage = offset % ITEM_ALIGNMENT;
149 if overage != 0 {
150 offset += ITEM_ALIGNMENT - overage;
151 }
152 let offset = offset / ITEM_ALIGNMENT;
153 let aligned_offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
154 Ok(aligned_offset)
155}
156
157pub struct Journal<E: Storage + Metrics, V: Codec> {
159 pub(crate) context: E,
160 pub(crate) cfg: Config<V::Cfg>,
161
162 pub(crate) blobs: BTreeMap<u64, Append<E::Blob>>,
163
164 pub(crate) oldest_retained_section: u64,
168
169 pub(crate) tracked: Gauge,
170 pub(crate) synced: Counter,
171 pub(crate) pruned: Counter,
172
173 pub(crate) _phantom: PhantomData<V>,
174}
175
176impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
177 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
183 let mut blobs = BTreeMap::new();
185 let stored_blobs = match context.scan(&cfg.partition).await {
186 Ok(blobs) => blobs,
187 Err(RError::PartitionMissing(_)) => Vec::new(),
188 Err(err) => return Err(Error::Runtime(err)),
189 };
190 for name in stored_blobs {
191 let (blob, size) = context.open(&cfg.partition, &name).await?;
192 let hex_name = hex(&name);
193 let section = match name.try_into() {
194 Ok(section) => u64::from_be_bytes(section),
195 Err(_) => return Err(Error::InvalidBlobName(hex_name)),
196 };
197 debug!(section, blob = hex_name, size, "loaded section");
198 let blob = Append::new(blob, size, cfg.write_buffer, cfg.buffer_pool.clone()).await?;
199 blobs.insert(section, blob);
200 }
201
202 let tracked = Gauge::default();
204 let synced = Counter::default();
205 let pruned = Counter::default();
206 context.register("tracked", "Number of blobs", tracked.clone());
207 context.register("synced", "Number of syncs", synced.clone());
208 context.register("pruned", "Number of blobs pruned", pruned.clone());
209 tracked.set(blobs.len() as i64);
210
211 Ok(Self {
213 context,
214 cfg,
215 blobs,
216 oldest_retained_section: 0,
217 tracked,
218 synced,
219 pruned,
220
221 _phantom: PhantomData,
222 })
223 }
224
225 fn prune_guard(&self, section: u64) -> Result<(), Error> {
227 if section < self.oldest_retained_section {
228 Err(Error::AlreadyPrunedToSection(self.oldest_retained_section))
229 } else {
230 Ok(())
231 }
232 }
233
234 pub(crate) async fn read(
236 compressed: bool,
237 cfg: &V::Cfg,
238 blob: &Append<E::Blob>,
239 offset: u32,
240 ) -> Result<(u32, u32, V), Error> {
241 let mut hasher = crc32fast::Hasher::new();
243 let offset = offset as u64 * ITEM_ALIGNMENT;
244 let size = blob.read_at(vec![0; 4], offset).await?;
245 hasher.update(size.as_ref());
246 let size = u32::from_be_bytes(size.as_ref().try_into().unwrap()) as usize;
247 let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
248
249 let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
251 let buf = blob.read_at(vec![0u8; buf_size], offset).await?;
252 let buf = buf.as_ref();
253 let offset = offset
254 .checked_add(buf_size as u64)
255 .ok_or(Error::OffsetOverflow)?;
256
257 let item = &buf[..size];
259 hasher.update(item);
260
261 let checksum = hasher.finalize();
263 let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
264 if checksum != stored_checksum {
265 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
266 }
267
268 let aligned_offset = compute_next_offset(offset)?;
270
271 let item = if compressed {
273 let decompressed =
274 decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
275 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
276 } else {
277 V::decode_cfg(item, cfg).map_err(Error::Codec)?
278 };
279
280 Ok((aligned_offset, size as u32, item))
282 }
283
284 async fn read_buffered(
286 reader: &mut Read<Append<E::Blob>>,
287 offset: u32,
288 cfg: &V::Cfg,
289 compressed: bool,
290 ) -> Result<(u32, u64, u32, V), Error> {
291 let file_offset = offset as u64 * ITEM_ALIGNMENT;
293
294 if reader.position() != file_offset {
296 reader.seek_to(file_offset).map_err(Error::Runtime)?;
297 }
298
299 let mut hasher = crc32fast::Hasher::new();
301 let mut size_buf = [0u8; 4];
302 reader
303 .read_exact(&mut size_buf, 4)
304 .await
305 .map_err(Error::Runtime)?;
306 hasher.update(&size_buf);
307
308 let size = u32::from_be_bytes(size_buf) as usize;
310 let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
311 let mut buf = vec![0u8; buf_size];
312 reader
313 .read_exact(&mut buf, buf_size)
314 .await
315 .map_err(Error::Runtime)?;
316
317 let item = &buf[..size];
319 hasher.update(item);
320
321 let checksum = hasher.finalize();
323 let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
324 if checksum != stored_checksum {
325 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
326 }
327
328 let item = if compressed {
330 let decompressed =
331 decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
332 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
333 } else {
334 V::decode_cfg(item, cfg).map_err(Error::Codec)?
335 };
336
337 let current_pos = reader.position();
339 let aligned_offset = compute_next_offset(current_pos)?;
340 Ok((aligned_offset, current_pos, size as u32, item))
341 }
342
343 async fn read_exact(
345 compressed: bool,
346 cfg: &V::Cfg,
347 blob: &Append<E::Blob>,
348 offset: u32,
349 len: u32,
350 ) -> Result<V, Error> {
351 let offset = offset as u64 * ITEM_ALIGNMENT;
353 let entry_size = 4 + len as usize + 4;
354 let buf = blob.read_at(vec![0u8; entry_size], offset).await?;
355
356 let mut hasher = crc32fast::Hasher::new();
358 let disk_size = u32::from_be_bytes(buf.as_ref()[..4].try_into().unwrap());
359 hasher.update(&buf.as_ref()[..4]);
360 if disk_size != len {
361 return Err(Error::UnexpectedSize(disk_size, len));
362 }
363
364 let item = &buf.as_ref()[4..4 + len as usize];
366 hasher.update(item);
367 let checksum = hasher.finalize();
368 let stored_checksum =
369 u32::from_be_bytes(buf.as_ref()[4 + len as usize..].try_into().unwrap());
370 if checksum != stored_checksum {
371 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
372 }
373
374 let item = if compressed {
376 decode_all(Cursor::new(item)).map_err(|_| Error::DecompressionFailed)?
377 } else {
378 item.to_vec()
379 };
380
381 let item = V::decode_cfg(item.as_ref(), cfg).map_err(Error::Codec)?;
383 Ok(item)
384 }
385
386 pub async fn replay(
399 &self,
400 start_section: u64,
401 mut offset: u32,
402 buffer: NonZeroUsize,
403 ) -> Result<impl Stream<Item = Result<(u64, u32, u32, V), Error>> + '_, Error> {
404 let codec_config = self.cfg.codec_config.clone();
406 let compressed = self.cfg.compression.is_some();
407 let mut blobs = Vec::with_capacity(self.blobs.len());
408 for (section, blob) in self.blobs.range(start_section..) {
409 let blob_size = blob.size().await;
410 let max_offset = compute_next_offset(blob_size)?;
411 blobs.push((
412 *section,
413 blob.clone(),
414 max_offset,
415 blob_size,
416 codec_config.clone(),
417 compressed,
418 ));
419 }
420
421 Ok(stream::iter(blobs).flat_map(
424 move |(section, blob, max_offset, blob_size, codec_config, compressed)| {
425 let mut reader = Read::new(blob, blob_size, buffer);
427 if section == start_section && offset != 0 {
428 if let Err(err) = reader.seek_to(offset as u64 * ITEM_ALIGNMENT) {
429 warn!(section, offset, ?err, "failed to seek to offset");
430 return stream::once(async move { Err(err.into()) }).left_stream();
432 }
433 } else {
434 offset = 0;
435 }
436
437 stream::unfold(
439 (section, reader, offset, 0u64, codec_config, compressed),
440 move |(
441 section,
442 mut reader,
443 offset,
444 valid_size,
445 codec_config,
446 compressed,
447 )| async move {
448 if offset >= max_offset {
450 return None;
451 }
452
453 match Self::read_buffered(
455 &mut reader,
456 offset,
457 &codec_config,
458 compressed,
459 )
460 .await
461 {
462 Ok((next_offset, next_valid_size, size, item)) => {
463 trace!(blob = section, cursor = offset, "replayed item");
464 Some((
465 Ok((section, offset, size, item)),
466 (
467 section,
468 reader,
469 next_offset,
470 next_valid_size,
471 codec_config,
472 compressed,
473 ),
474 ))
475 }
476 Err(Error::ChecksumMismatch(expected, found)) => {
477 warn!(
481 blob = section,
482 bad_offset = offset,
483 new_size = valid_size,
484 expected,
485 found,
486 "corruption detected: truncating"
487 );
488 reader.resize(valid_size).await.ok()?;
489 None
490 }
491 Err(Error::Runtime(RError::BlobInsufficientLength)) => {
492 warn!(
496 blob = section,
497 bad_offset = offset,
498 new_size = valid_size,
499 "trailing bytes detected: truncating"
500 );
501 reader.resize(valid_size).await.ok()?;
502 None
503 }
504 Err(err) => {
505 warn!(
508 blob = section,
509 cursor = offset,
510 ?err,
511 "unexpected error"
512 );
513 Some((
514 Err(err),
515 (
516 section,
517 reader,
518 offset,
519 valid_size,
520 codec_config,
521 compressed,
522 ),
523 ))
524 }
525 }
526 },
527 ).right_stream()
528 },
529 ))
530 }
531
532 pub async fn append(&mut self, section: u64, item: V) -> Result<(u32, u32), Error> {
544 self.prune_guard(section)?;
546
547 let encoded = item.encode();
549 let encoded = if let Some(compression) = self.cfg.compression {
550 compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?
551 } else {
552 encoded.into()
553 };
554
555 let item_len = encoded.len();
557 let entry_len = 4 + item_len + 4;
558 let item_len = match item_len.try_into() {
559 Ok(len) => len,
560 Err(_) => return Err(Error::ItemTooLarge(item_len)),
561 };
562
563 let blob = match self.blobs.entry(section) {
565 Entry::Occupied(entry) => entry.into_mut(),
566 Entry::Vacant(entry) => {
567 let name = section.to_be_bytes();
568 let (blob, size) = self.context.open(&self.cfg.partition, &name).await?;
569 let blob = Append::new(
570 blob,
571 size,
572 self.cfg.write_buffer,
573 self.cfg.buffer_pool.clone(),
574 )
575 .await?;
576 self.tracked.inc();
577 entry.insert(blob)
578 }
579 };
580
581 let cursor = blob.size().await;
583 let offset = compute_next_offset(cursor)?;
584 let aligned_cursor = offset as u64 * ITEM_ALIGNMENT;
585 let padding = (aligned_cursor - cursor) as usize;
586
587 let mut buf = Vec::with_capacity(padding + entry_len);
589
590 if padding > 0 {
592 buf.resize(padding, 0);
593 }
594
595 let entry_start = buf.len();
597 buf.put_u32(item_len);
598 buf.put_slice(&encoded);
599
600 let checksum = crc32fast::hash(&buf[entry_start..]);
602 buf.put_u32(checksum);
603 assert_eq!(buf[entry_start..].len(), entry_len);
604
605 blob.append(buf).await?;
607 trace!(blob = section, offset, "appended item");
608 Ok((offset, item_len))
609 }
610
611 pub async fn get(&self, section: u64, offset: u32) -> Result<V, Error> {
622 self.prune_guard(section)?;
623 let blob = match self.blobs.get(§ion) {
624 Some(blob) => blob,
625 None => return Err(Error::SectionOutOfRange(section)),
626 };
627
628 let (_, _, item) = Self::read(
630 self.cfg.compression.is_some(),
631 &self.cfg.codec_config,
632 blob,
633 offset,
634 )
635 .await?;
636 Ok(item)
637 }
638
639 pub async fn get_exact(&self, section: u64, offset: u32, size: u32) -> Result<V, Error> {
641 self.prune_guard(section)?;
642 let blob = match self.blobs.get(§ion) {
643 Some(blob) => blob,
644 None => return Err(Error::SectionOutOfRange(section)),
645 };
646
647 let item = Self::read_exact(
649 self.cfg.compression.is_some(),
650 &self.cfg.codec_config,
651 blob,
652 offset,
653 size,
654 )
655 .await?;
656 Ok(item)
657 }
658
659 pub async fn size(&self, section: u64) -> Result<u64, Error> {
663 self.prune_guard(section)?;
664 match self.blobs.get(§ion) {
665 Some(blob) => Ok(blob.size().await),
666 None => Ok(0),
667 }
668 }
669
670 pub async fn rewind_to_offset(&mut self, section: u64, offset: u32) -> Result<(), Error> {
678 self.rewind(section, offset as u64 * ITEM_ALIGNMENT).await
679 }
680
681 pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
691 self.prune_guard(section)?;
692
693 let trailing: Vec<u64> = self
695 .blobs
696 .range((
697 std::ops::Bound::Excluded(section),
698 std::ops::Bound::Unbounded,
699 ))
700 .map(|(§ion, _)| section)
701 .collect();
702 for index in trailing.iter().rev() {
703 let blob = self.blobs.remove(index).unwrap();
705
706 drop(blob);
708 self.context
709 .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
710 .await?;
711 debug!(section = index, "removed section");
712 self.tracked.dec();
713 }
714
715 let blob = match self.blobs.get_mut(§ion) {
717 Some(blob) => blob,
718 None => return Ok(()),
719 };
720 let current = blob.size().await;
721 if size >= current {
722 return Ok(()); }
724 blob.resize(size).await?;
725 debug!(
726 section,
727 from = current,
728 to = size,
729 ?trailing,
730 "rewound journal"
731 );
732 Ok(())
733 }
734
735 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
743 self.prune_guard(section)?;
744
745 let blob = match self.blobs.get_mut(§ion) {
747 Some(blob) => blob,
748 None => return Ok(()),
749 };
750
751 let current = blob.size().await;
753 if size >= current {
754 return Ok(()); }
756 blob.resize(size).await?;
757 debug!(section, from = current, to = size, "rewound section");
758 Ok(())
759 }
760
761 pub async fn sync(&self, section: u64) -> Result<(), Error> {
765 self.prune_guard(section)?;
766 let blob = match self.blobs.get(§ion) {
767 Some(blob) => blob,
768 None => return Ok(()),
769 };
770 self.synced.inc();
771 blob.sync().await.map_err(Error::Runtime)
772 }
773
774 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
776 let mut pruned = false;
778 while let Some((§ion, _)) = self.blobs.first_key_value() {
779 if section >= min {
781 break;
782 }
783
784 let blob = self.blobs.remove(§ion).unwrap();
786 let size = blob.size().await;
787 drop(blob);
788
789 self.context
791 .remove(&self.cfg.partition, Some(§ion.to_be_bytes()))
792 .await?;
793 pruned = true;
794
795 debug!(blob = section, size, "pruned blob");
796 self.tracked.dec();
797 self.pruned.inc();
798 }
799
800 if pruned {
801 self.oldest_retained_section = min;
802 }
803
804 Ok(pruned)
805 }
806
807 pub async fn close(self) -> Result<(), Error> {
809 for (section, blob) in self.blobs.into_iter() {
810 let size = blob.size().await;
811 blob.sync().await?;
812 debug!(blob = section, size, "synced blob");
813 }
814 Ok(())
815 }
816
817 pub fn oldest_section(&self) -> Option<u64> {
819 self.blobs.first_key_value().map(|(section, _)| *section)
820 }
821
822 pub async fn destroy(self) -> Result<(), Error> {
824 for (i, blob) in self.blobs.into_iter() {
825 let size = blob.size().await;
826 drop(blob);
827 debug!(blob = i, size, "destroyed blob");
828 self.context
829 .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
830 .await?;
831 }
832 match self.context.remove(&self.cfg.partition, None).await {
833 Ok(()) => {}
834 Err(RError::PartitionMissing(_)) => {
835 }
837 Err(err) => return Err(Error::Runtime(err)),
838 }
839 Ok(())
840 }
841}
842
843#[cfg(test)]
844mod tests {
845 use super::*;
846 use bytes::BufMut;
847 use commonware_cryptography::{Hasher, Sha256};
848 use commonware_macros::test_traced;
849 use commonware_runtime::{deterministic, Blob, Error as RError, Runner, Storage};
850 use commonware_utils::{NZUsize, StableBuf};
851 use futures::{pin_mut, StreamExt};
852 use prometheus_client::registry::Metric;
853
854 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
855 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
856
857 #[test_traced]
858 fn test_journal_append_and_read() {
859 let executor = deterministic::Runner::default();
861
862 executor.start(|context| async move {
864 let cfg = Config {
866 partition: "test_partition".into(),
867 compression: None,
868 codec_config: (),
869 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
870 write_buffer: NZUsize!(1024),
871 };
872 let index = 1u64;
873 let data = 10;
874 let mut journal = Journal::init(context.clone(), cfg.clone())
875 .await
876 .expect("Failed to initialize journal");
877
878 journal
880 .append(index, data)
881 .await
882 .expect("Failed to append data");
883
884 let buffer = context.encode();
886 assert!(buffer.contains("tracked 1"));
887
888 journal.close().await.expect("Failed to close journal");
890
891 let cfg = Config {
893 partition: "test_partition".into(),
894 compression: None,
895 codec_config: (),
896 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
897 write_buffer: NZUsize!(1024),
898 };
899 let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
900 .await
901 .expect("Failed to re-initialize journal");
902
903 let mut items = Vec::new();
905 let stream = journal
906 .replay(0, 0, NZUsize!(1024))
907 .await
908 .expect("unable to setup replay");
909 pin_mut!(stream);
910 while let Some(result) = stream.next().await {
911 match result {
912 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
913 Err(err) => panic!("Failed to read item: {err}"),
914 }
915 }
916
917 assert_eq!(items.len(), 1);
919 assert_eq!(items[0].0, index);
920 assert_eq!(items[0].1, data);
921
922 let buffer = context.encode();
924 assert!(buffer.contains("tracked 1"));
925 });
926 }
927
928 #[test_traced]
929 fn test_journal_multiple_appends_and_reads() {
930 let executor = deterministic::Runner::default();
932
933 executor.start(|context| async move {
935 let cfg = Config {
937 partition: "test_partition".into(),
938 compression: None,
939 codec_config: (),
940 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
941 write_buffer: NZUsize!(1024),
942 };
943
944 let mut journal = Journal::init(context.clone(), cfg.clone())
946 .await
947 .expect("Failed to initialize journal");
948
949 let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
951 for (index, data) in &data_items {
952 journal
953 .append(*index, *data)
954 .await
955 .expect("Failed to append data");
956 journal.sync(*index).await.expect("Failed to sync blob");
957 }
958
959 let buffer = context.encode();
961 assert!(buffer.contains("tracked 3"));
962 assert!(buffer.contains("synced_total 4"));
963
964 journal.close().await.expect("Failed to close journal");
966
967 let journal = Journal::init(context, cfg)
969 .await
970 .expect("Failed to re-initialize journal");
971
972 let mut items = Vec::<(u64, u32)>::new();
974 {
975 let stream = journal
976 .replay(0, 0, NZUsize!(1024))
977 .await
978 .expect("unable to setup replay");
979 pin_mut!(stream);
980 while let Some(result) = stream.next().await {
981 match result {
982 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
983 Err(err) => panic!("Failed to read item: {err}"),
984 }
985 }
986 }
987
988 assert_eq!(items.len(), data_items.len());
990 for ((expected_index, expected_data), (actual_index, actual_data)) in
991 data_items.iter().zip(items.iter())
992 {
993 assert_eq!(actual_index, expected_index);
994 assert_eq!(actual_data, expected_data);
995 }
996
997 journal.destroy().await.expect("Failed to destroy journal");
999 });
1000 }
1001
1002 #[test_traced]
1003 fn test_journal_prune_blobs() {
1004 let executor = deterministic::Runner::default();
1006
1007 executor.start(|context| async move {
1009 let cfg = Config {
1011 partition: "test_partition".into(),
1012 compression: None,
1013 codec_config: (),
1014 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1015 write_buffer: NZUsize!(1024),
1016 };
1017
1018 let mut journal = Journal::init(context.clone(), cfg.clone())
1020 .await
1021 .expect("Failed to initialize journal");
1022
1023 for index in 1u64..=5u64 {
1025 journal
1026 .append(index, index)
1027 .await
1028 .expect("Failed to append data");
1029 journal.sync(index).await.expect("Failed to sync blob");
1030 }
1031
1032 let data = 99;
1034 journal
1035 .append(2u64, data)
1036 .await
1037 .expect("Failed to append data");
1038 journal.sync(2u64).await.expect("Failed to sync blob");
1039
1040 journal.prune(3).await.expect("Failed to prune blobs");
1042
1043 let buffer = context.encode();
1045 assert!(buffer.contains("pruned_total 2"));
1046
1047 journal.prune(2).await.expect("Failed to no-op prune");
1049 let buffer = context.encode();
1050 assert!(buffer.contains("pruned_total 2"));
1051
1052 journal.close().await.expect("Failed to close journal");
1054
1055 let mut journal = Journal::init(context.clone(), cfg.clone())
1057 .await
1058 .expect("Failed to re-initialize journal");
1059
1060 let mut items = Vec::<(u64, u64)>::new();
1062 {
1063 let stream = journal
1064 .replay(0, 0, NZUsize!(1024))
1065 .await
1066 .expect("unable to setup replay");
1067 pin_mut!(stream);
1068 while let Some(result) = stream.next().await {
1069 match result {
1070 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1071 Err(err) => panic!("Failed to read item: {err}"),
1072 }
1073 }
1074 }
1075
1076 assert_eq!(items.len(), 3);
1078 let expected_indices = [3u64, 4u64, 5u64];
1079 for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
1080 assert_eq!(item.0, *expected_index);
1081 }
1082
1083 journal.prune(6).await.expect("Failed to prune blobs");
1085
1086 journal.close().await.expect("Failed to close journal");
1088
1089 assert!(context
1094 .scan(&cfg.partition)
1095 .await
1096 .expect("Failed to list blobs")
1097 .is_empty());
1098 });
1099 }
1100
1101 #[test_traced]
1102 fn test_journal_prune_guard() {
1103 let executor = deterministic::Runner::default();
1104
1105 executor.start(|context| async move {
1106 let cfg = Config {
1107 partition: "test_partition".into(),
1108 compression: None,
1109 codec_config: (),
1110 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1111 write_buffer: NZUsize!(1024),
1112 };
1113
1114 let mut journal = Journal::init(context.clone(), cfg.clone())
1115 .await
1116 .expect("Failed to initialize journal");
1117
1118 for section in 1u64..=5u64 {
1120 journal
1121 .append(section, section as i32)
1122 .await
1123 .expect("Failed to append data");
1124 journal.sync(section).await.expect("Failed to sync");
1125 }
1126
1127 assert_eq!(journal.oldest_retained_section, 0);
1129
1130 journal.prune(3).await.expect("Failed to prune");
1132
1133 assert_eq!(journal.oldest_retained_section, 3);
1135
1136 match journal.append(1, 100).await {
1140 Err(Error::AlreadyPrunedToSection(3)) => {}
1141 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1142 }
1143
1144 match journal.append(2, 100).await {
1145 Err(Error::AlreadyPrunedToSection(3)) => {}
1146 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1147 }
1148
1149 match journal.get(1, 0).await {
1151 Err(Error::AlreadyPrunedToSection(3)) => {}
1152 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1153 }
1154
1155 match journal.get_exact(2, 0, 12).await {
1157 Err(Error::AlreadyPrunedToSection(3)) => {}
1158 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1159 }
1160
1161 match journal.size(1).await {
1163 Err(Error::AlreadyPrunedToSection(3)) => {}
1164 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1165 }
1166
1167 match journal.rewind(2, 0).await {
1169 Err(Error::AlreadyPrunedToSection(3)) => {}
1170 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1171 }
1172
1173 match journal.rewind_section(1, 0).await {
1175 Err(Error::AlreadyPrunedToSection(3)) => {}
1176 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1177 }
1178
1179 match journal.sync(2).await {
1181 Err(Error::AlreadyPrunedToSection(3)) => {}
1182 other => panic!("Expected AlreadyPrunedToSection(3), got {other:?}"),
1183 }
1184
1185 assert!(journal.get(3, 0).await.is_ok());
1187 assert!(journal.get(4, 0).await.is_ok());
1188 assert!(journal.get(5, 0).await.is_ok());
1189 assert!(journal.size(3).await.is_ok());
1190 assert!(journal.sync(4).await.is_ok());
1191
1192 journal
1194 .append(3, 999)
1195 .await
1196 .expect("Should be able to append to section 3");
1197
1198 journal.prune(5).await.expect("Failed to prune");
1200 assert_eq!(journal.oldest_retained_section, 5);
1201
1202 match journal.get(3, 0).await {
1204 Err(Error::AlreadyPrunedToSection(5)) => {}
1205 other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1206 }
1207
1208 match journal.get(4, 0).await {
1209 Err(Error::AlreadyPrunedToSection(5)) => {}
1210 other => panic!("Expected AlreadyPrunedToSection(5), got {other:?}"),
1211 }
1212
1213 assert!(journal.get(5, 0).await.is_ok());
1215
1216 journal.close().await.expect("Failed to close journal");
1217 });
1218 }
1219
1220 #[test_traced]
1221 fn test_journal_prune_guard_across_restart() {
1222 let executor = deterministic::Runner::default();
1223
1224 executor.start(|context| async move {
1225 let cfg = Config {
1226 partition: "test_partition".into(),
1227 compression: None,
1228 codec_config: (),
1229 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1230 write_buffer: NZUsize!(1024),
1231 };
1232
1233 {
1235 let mut journal = Journal::init(context.clone(), cfg.clone())
1236 .await
1237 .expect("Failed to initialize journal");
1238
1239 for section in 1u64..=5u64 {
1240 journal
1241 .append(section, section as i32)
1242 .await
1243 .expect("Failed to append data");
1244 journal.sync(section).await.expect("Failed to sync");
1245 }
1246
1247 journal.prune(3).await.expect("Failed to prune");
1248 assert_eq!(journal.oldest_retained_section, 3);
1249
1250 journal.close().await.expect("Failed to close journal");
1251 }
1252
1253 {
1255 let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
1256 .await
1257 .expect("Failed to re-initialize journal");
1258
1259 assert_eq!(journal.oldest_retained_section, 0);
1262
1263 match journal.get(1, 0).await {
1266 Err(Error::SectionOutOfRange(1)) => {}
1267 other => panic!("Expected SectionOutOfRange(1), got {other:?}"),
1268 }
1269
1270 match journal.get(2, 0).await {
1271 Err(Error::SectionOutOfRange(2)) => {}
1272 other => panic!("Expected SectionOutOfRange(2), got {other:?}"),
1273 }
1274
1275 assert!(journal.get(3, 0).await.is_ok());
1277 assert!(journal.get(4, 0).await.is_ok());
1278 assert!(journal.get(5, 0).await.is_ok());
1279
1280 journal.close().await.expect("Failed to close journal");
1281 }
1282 });
1283 }
1284
1285 #[test_traced]
1286 fn test_journal_with_invalid_blob_name() {
1287 let executor = deterministic::Runner::default();
1289
1290 executor.start(|context| async move {
1292 let cfg = Config {
1294 partition: "test_partition".into(),
1295 compression: None,
1296 codec_config: (),
1297 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1298 write_buffer: NZUsize!(1024),
1299 };
1300
1301 let invalid_blob_name = b"invalid"; let (blob, _) = context
1304 .open(&cfg.partition, invalid_blob_name)
1305 .await
1306 .expect("Failed to create blob with invalid name");
1307 blob.sync().await.expect("Failed to sync blob");
1308
1309 let result = Journal::<_, u64>::init(context, cfg).await;
1311
1312 assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1314 });
1315 }
1316
1317 #[test_traced]
1318 fn test_journal_read_size_missing() {
1319 let executor = deterministic::Runner::default();
1321
1322 executor.start(|context| async move {
1324 let cfg = Config {
1326 partition: "test_partition".into(),
1327 compression: None,
1328 codec_config: (),
1329 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1330 write_buffer: NZUsize!(1024),
1331 };
1332
1333 let section = 1u64;
1335 let blob_name = section.to_be_bytes();
1336 let (blob, _) = context
1337 .open(&cfg.partition, &blob_name)
1338 .await
1339 .expect("Failed to create blob");
1340
1341 let incomplete_data = vec![0x00, 0x01]; blob.write_at(incomplete_data, 0)
1344 .await
1345 .expect("Failed to write incomplete data");
1346 blob.sync().await.expect("Failed to sync blob");
1347
1348 let journal = Journal::init(context, cfg)
1350 .await
1351 .expect("Failed to initialize journal");
1352
1353 let stream = journal
1355 .replay(0, 0, NZUsize!(1024))
1356 .await
1357 .expect("unable to setup replay");
1358 pin_mut!(stream);
1359 let mut items = Vec::<(u64, u64)>::new();
1360 while let Some(result) = stream.next().await {
1361 match result {
1362 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1363 Err(err) => panic!("Failed to read item: {err}"),
1364 }
1365 }
1366 assert!(items.is_empty());
1367 });
1368 }
1369
1370 #[test_traced]
1371 fn test_journal_read_item_missing() {
1372 let executor = deterministic::Runner::default();
1374
1375 executor.start(|context| async move {
1377 let cfg = Config {
1379 partition: "test_partition".into(),
1380 compression: None,
1381 codec_config: (),
1382 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1383 write_buffer: NZUsize!(1024),
1384 };
1385
1386 let section = 1u64;
1388 let blob_name = section.to_be_bytes();
1389 let (blob, _) = context
1390 .open(&cfg.partition, &blob_name)
1391 .await
1392 .expect("Failed to create blob");
1393
1394 let item_size: u32 = 10; let mut buf = Vec::new();
1397 buf.put_u32(item_size);
1398 let data = [2u8; 5];
1399 BufMut::put_slice(&mut buf, &data);
1400 blob.write_at(buf, 0)
1401 .await
1402 .expect("Failed to write item size");
1403 blob.sync().await.expect("Failed to sync blob");
1404
1405 let journal = Journal::init(context, cfg)
1407 .await
1408 .expect("Failed to initialize journal");
1409
1410 let stream = journal
1412 .replay(0, 0, NZUsize!(1024))
1413 .await
1414 .expect("unable to setup replay");
1415 pin_mut!(stream);
1416 let mut items = Vec::<(u64, u64)>::new();
1417 while let Some(result) = stream.next().await {
1418 match result {
1419 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1420 Err(err) => panic!("Failed to read item: {err}"),
1421 }
1422 }
1423 assert!(items.is_empty());
1424 });
1425 }
1426
1427 #[test_traced]
1428 fn test_journal_read_checksum_missing() {
1429 let executor = deterministic::Runner::default();
1431
1432 executor.start(|context| async move {
1434 let cfg = Config {
1436 partition: "test_partition".into(),
1437 compression: None,
1438 codec_config: (),
1439 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1440 write_buffer: NZUsize!(1024),
1441 };
1442
1443 let section = 1u64;
1445 let blob_name = section.to_be_bytes();
1446 let (blob, _) = context
1447 .open(&cfg.partition, &blob_name)
1448 .await
1449 .expect("Failed to create blob");
1450
1451 let item_data = b"Test data";
1453 let item_size = item_data.len() as u32;
1454
1455 let mut offset = 0;
1457 blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1458 .await
1459 .expect("Failed to write item size");
1460 offset += 4;
1461
1462 blob.write_at(item_data.to_vec(), offset)
1464 .await
1465 .expect("Failed to write item data");
1466 blob.sync().await.expect("Failed to sync blob");
1469
1470 let journal = Journal::init(context, cfg)
1472 .await
1473 .expect("Failed to initialize journal");
1474
1475 let stream = journal
1479 .replay(0, 0, NZUsize!(1024))
1480 .await
1481 .expect("unable to setup replay");
1482 pin_mut!(stream);
1483 let mut items = Vec::<(u64, u64)>::new();
1484 while let Some(result) = stream.next().await {
1485 match result {
1486 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1487 Err(err) => panic!("Failed to read item: {err}"),
1488 }
1489 }
1490 assert!(items.is_empty());
1491 });
1492 }
1493
1494 #[test_traced]
1495 fn test_journal_read_checksum_mismatch() {
1496 let executor = deterministic::Runner::default();
1498
1499 executor.start(|context| async move {
1501 let cfg = Config {
1503 partition: "test_partition".into(),
1504 compression: None,
1505 codec_config: (),
1506 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1507 write_buffer: NZUsize!(1024),
1508 };
1509
1510 let section = 1u64;
1512 let blob_name = section.to_be_bytes();
1513 let (blob, _) = context
1514 .open(&cfg.partition, &blob_name)
1515 .await
1516 .expect("Failed to create blob");
1517
1518 let item_data = b"Test data";
1520 let item_size = item_data.len() as u32;
1521 let incorrect_checksum: u32 = 0xDEADBEEF;
1522
1523 let mut offset = 0;
1525 blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1526 .await
1527 .expect("Failed to write item size");
1528 offset += 4;
1529
1530 blob.write_at(item_data.to_vec(), offset)
1532 .await
1533 .expect("Failed to write item data");
1534 offset += item_data.len() as u64;
1535
1536 blob.write_at(incorrect_checksum.to_be_bytes().to_vec(), offset)
1538 .await
1539 .expect("Failed to write incorrect checksum");
1540
1541 blob.sync().await.expect("Failed to sync blob");
1542
1543 let journal = Journal::init(context.clone(), cfg.clone())
1545 .await
1546 .expect("Failed to initialize journal");
1547
1548 {
1550 let stream = journal
1551 .replay(0, 0, NZUsize!(1024))
1552 .await
1553 .expect("unable to setup replay");
1554 pin_mut!(stream);
1555 let mut items = Vec::<(u64, u64)>::new();
1556 while let Some(result) = stream.next().await {
1557 match result {
1558 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1559 Err(err) => panic!("Failed to read item: {err}"),
1560 }
1561 }
1562 assert!(items.is_empty());
1563 }
1564 journal.close().await.expect("Failed to close journal");
1565
1566 let (_, blob_size) = context
1568 .open(&cfg.partition, §ion.to_be_bytes())
1569 .await
1570 .expect("Failed to open blob");
1571 assert_eq!(blob_size, 0);
1572 });
1573 }
1574
1575 #[test_traced]
1576 fn test_journal_handling_unaligned_truncated_data() {
1577 let executor = deterministic::Runner::default();
1579
1580 executor.start(|context| async move {
1582 let cfg = Config {
1584 partition: "test_partition".into(),
1585 compression: None,
1586 codec_config: (),
1587 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1588 write_buffer: NZUsize!(1024),
1589 };
1590
1591 let mut journal = Journal::init(context.clone(), cfg.clone())
1593 .await
1594 .expect("Failed to initialize journal");
1595
1596 journal.append(1, 1).await.expect("Failed to append data");
1598
1599 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1601 for (index, data) in &data_items {
1602 journal
1603 .append(*index, *data)
1604 .await
1605 .expect("Failed to append data");
1606 journal.sync(*index).await.expect("Failed to sync blob");
1607 }
1608
1609 journal.close().await.expect("Failed to close journal");
1611
1612 let (blob, blob_size) = context
1614 .open(&cfg.partition, &2u64.to_be_bytes())
1615 .await
1616 .expect("Failed to open blob");
1617 blob.resize(blob_size - 4)
1618 .await
1619 .expect("Failed to corrupt blob");
1620 blob.sync().await.expect("Failed to sync blob");
1621
1622 let journal = Journal::init(context.clone(), cfg.clone())
1624 .await
1625 .expect("Failed to re-initialize journal");
1626
1627 let mut items = Vec::<(u64, u32)>::new();
1629 {
1630 let stream = journal
1631 .replay(0, 0, NZUsize!(1024))
1632 .await
1633 .expect("unable to setup replay");
1634 pin_mut!(stream);
1635 while let Some(result) = stream.next().await {
1636 match result {
1637 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1638 Err(err) => panic!("Failed to read item: {err}"),
1639 }
1640 }
1641 }
1642 journal.close().await.expect("Failed to close journal");
1643
1644 assert_eq!(items.len(), 3);
1646 assert_eq!(items[0].0, 1);
1647 assert_eq!(items[0].1, 1);
1648 assert_eq!(items[1].0, data_items[0].0);
1649 assert_eq!(items[1].1, data_items[0].1);
1650 assert_eq!(items[2].0, data_items[1].0);
1651 assert_eq!(items[2].1, data_items[1].1);
1652
1653 let (_, blob_size) = context
1655 .open(&cfg.partition, &2u64.to_be_bytes())
1656 .await
1657 .expect("Failed to open blob");
1658 assert_eq!(blob_size, 28);
1659
1660 let mut journal = Journal::init(context.clone(), cfg.clone())
1662 .await
1663 .expect("Failed to re-initialize journal");
1664
1665 let mut items = Vec::<(u64, u32)>::new();
1667 {
1668 let stream = journal
1669 .replay(0, 0, NZUsize!(1024))
1670 .await
1671 .expect("unable to setup replay");
1672 pin_mut!(stream);
1673 while let Some(result) = stream.next().await {
1674 match result {
1675 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1676 Err(err) => panic!("Failed to read item: {err}"),
1677 }
1678 }
1679 }
1680
1681 assert_eq!(items.len(), 3);
1683 assert_eq!(items[0].0, 1);
1684 assert_eq!(items[0].1, 1);
1685 assert_eq!(items[1].0, data_items[0].0);
1686 assert_eq!(items[1].1, data_items[0].1);
1687 assert_eq!(items[2].0, data_items[1].0);
1688 assert_eq!(items[2].1, data_items[1].1);
1689
1690 journal.append(2, 5).await.expect("Failed to append data");
1692 journal.sync(2).await.expect("Failed to sync blob");
1693
1694 let item = journal.get(2, 2).await.expect("Failed to get item");
1696 assert_eq!(item, 5);
1697
1698 journal.close().await.expect("Failed to close journal");
1700
1701 let (_, blob_size) = context
1703 .open(&cfg.partition, &2u64.to_be_bytes())
1704 .await
1705 .expect("Failed to open blob");
1706 assert_eq!(blob_size, 44);
1707
1708 let journal = Journal::init(context.clone(), cfg.clone())
1710 .await
1711 .expect("Failed to re-initialize journal");
1712
1713 let mut items = Vec::<(u64, u32)>::new();
1715 {
1716 let stream = journal
1717 .replay(0, 0, NZUsize!(1024))
1718 .await
1719 .expect("unable to setup replay");
1720 pin_mut!(stream);
1721 while let Some(result) = stream.next().await {
1722 match result {
1723 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1724 Err(err) => panic!("Failed to read item: {err}"),
1725 }
1726 }
1727 }
1728
1729 assert_eq!(items.len(), 4);
1731 assert_eq!(items[0].0, 1);
1732 assert_eq!(items[0].1, 1);
1733 assert_eq!(items[1].0, data_items[0].0);
1734 assert_eq!(items[1].1, data_items[0].1);
1735 assert_eq!(items[2].0, data_items[1].0);
1736 assert_eq!(items[2].1, data_items[1].1);
1737 assert_eq!(items[3].0, 2);
1738 assert_eq!(items[3].1, 5);
1739 });
1740 }
1741
1742 #[test_traced]
1743 fn test_journal_handling_aligned_truncated_data() {
1744 let executor = deterministic::Runner::default();
1746
1747 executor.start(|context| async move {
1749 let cfg = Config {
1751 partition: "test_partition".into(),
1752 compression: None,
1753 codec_config: (),
1754 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1755 write_buffer: NZUsize!(1024),
1756 };
1757
1758 let mut journal = Journal::init(context.clone(), cfg.clone())
1760 .await
1761 .expect("Failed to initialize journal");
1762
1763 journal.append(1, 1).await.expect("Failed to append data");
1765
1766 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1768 for (index, data) in &data_items {
1769 journal
1770 .append(*index, *data)
1771 .await
1772 .expect("Failed to append data");
1773 journal.sync(*index).await.expect("Failed to sync blob");
1774 }
1775
1776 journal.close().await.expect("Failed to close journal");
1778
1779 let (blob, blob_size) = context
1781 .open(&cfg.partition, &2u64.to_be_bytes())
1782 .await
1783 .expect("Failed to open blob");
1784 blob.resize(blob_size - 4)
1785 .await
1786 .expect("Failed to corrupt blob");
1787 blob.sync().await.expect("Failed to sync blob");
1788
1789 let mut journal = Journal::init(context.clone(), cfg.clone())
1791 .await
1792 .expect("Failed to re-initialize journal");
1793
1794 let mut items = Vec::<(u64, u64)>::new();
1796 {
1797 let stream = journal
1798 .replay(0, 0, NZUsize!(1024))
1799 .await
1800 .expect("unable to setup replay");
1801 pin_mut!(stream);
1802 while let Some(result) = stream.next().await {
1803 match result {
1804 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1805 Err(err) => panic!("Failed to read item: {err}"),
1806 }
1807 }
1808 }
1809
1810 assert_eq!(items.len(), 3);
1812 assert_eq!(items[0].0, 1);
1813 assert_eq!(items[0].1, 1);
1814 assert_eq!(items[1].0, data_items[0].0);
1815 assert_eq!(items[1].1, data_items[0].1);
1816 assert_eq!(items[2].0, data_items[1].0);
1817 assert_eq!(items[2].1, data_items[1].1);
1818
1819 journal.append(2, 5).await.expect("Failed to append data");
1821 journal.sync(2).await.expect("Failed to sync blob");
1822
1823 let item = journal.get(2, 2).await.expect("Failed to get item");
1825 assert_eq!(item, 5);
1826
1827 journal.close().await.expect("Failed to close journal");
1829
1830 let (_, blob_size) = context
1832 .open(&cfg.partition, &2u64.to_be_bytes())
1833 .await
1834 .expect("Failed to open blob");
1835 assert_eq!(blob_size, 48);
1836
1837 let journal = Journal::init(context, cfg)
1839 .await
1840 .expect("Failed to re-initialize journal");
1841
1842 let mut items = Vec::<(u64, u64)>::new();
1844 {
1845 let stream = journal
1846 .replay(0, 0, NZUsize!(1024))
1847 .await
1848 .expect("unable to setup replay");
1849 pin_mut!(stream);
1850 while let Some(result) = stream.next().await {
1851 match result {
1852 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1853 Err(err) => panic!("Failed to read item: {err}"),
1854 }
1855 }
1856 }
1857 journal.close().await.expect("Failed to close journal");
1858
1859 assert_eq!(items.len(), 4);
1861 assert_eq!(items[0].0, 1);
1862 assert_eq!(items[0].1, 1);
1863 assert_eq!(items[1].0, data_items[0].0);
1864 assert_eq!(items[1].1, data_items[0].1);
1865 assert_eq!(items[2].0, data_items[1].0);
1866 assert_eq!(items[2].1, data_items[1].1);
1867 assert_eq!(items[3].0, 2);
1868 assert_eq!(items[3].1, 5);
1869 });
1870 }
1871
1872 #[test_traced]
1873 fn test_journal_handling_extra_data() {
1874 let executor = deterministic::Runner::default();
1876
1877 executor.start(|context| async move {
1879 let cfg = Config {
1881 partition: "test_partition".into(),
1882 compression: None,
1883 codec_config: (),
1884 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1885 write_buffer: NZUsize!(1024),
1886 };
1887
1888 let mut journal = Journal::init(context.clone(), cfg.clone())
1890 .await
1891 .expect("Failed to initialize journal");
1892
1893 journal.append(1, 1).await.expect("Failed to append data");
1895
1896 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1898 for (index, data) in &data_items {
1899 journal
1900 .append(*index, *data)
1901 .await
1902 .expect("Failed to append data");
1903 journal.sync(*index).await.expect("Failed to sync blob");
1904 }
1905
1906 journal.close().await.expect("Failed to close journal");
1908
1909 let (blob, blob_size) = context
1911 .open(&cfg.partition, &2u64.to_be_bytes())
1912 .await
1913 .expect("Failed to open blob");
1914 blob.write_at(vec![0u8; 16], blob_size)
1915 .await
1916 .expect("Failed to add extra data");
1917 blob.sync().await.expect("Failed to sync blob");
1918
1919 let journal = Journal::init(context, cfg)
1921 .await
1922 .expect("Failed to re-initialize journal");
1923
1924 let mut items = Vec::<(u64, i32)>::new();
1926 let stream = journal
1927 .replay(0, 0, NZUsize!(1024))
1928 .await
1929 .expect("unable to setup replay");
1930 pin_mut!(stream);
1931 while let Some(result) = stream.next().await {
1932 match result {
1933 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1934 Err(err) => panic!("Failed to read item: {err}"),
1935 }
1936 }
1937 });
1938 }
1939
1940 #[derive(Clone)]
1942 struct MockBlob {}
1943
1944 impl Blob for MockBlob {
1945 async fn read_at(
1946 &self,
1947 buf: impl Into<StableBuf> + Send,
1948 _offset: u64,
1949 ) -> Result<StableBuf, RError> {
1950 Ok(buf.into())
1951 }
1952
1953 async fn write_at(
1954 &self,
1955 _buf: impl Into<StableBuf> + Send,
1956 _offset: u64,
1957 ) -> Result<(), RError> {
1958 Ok(())
1959 }
1960
1961 async fn resize(&self, _len: u64) -> Result<(), RError> {
1962 Ok(())
1963 }
1964
1965 async fn sync(&self) -> Result<(), RError> {
1966 Ok(())
1967 }
1968 }
1969
1970 #[derive(Clone)]
1972 struct MockStorage {
1973 len: u64,
1974 }
1975
1976 impl Storage for MockStorage {
1977 type Blob = MockBlob;
1978
1979 async fn open(&self, _partition: &str, _name: &[u8]) -> Result<(MockBlob, u64), RError> {
1980 Ok((MockBlob {}, self.len))
1981 }
1982
1983 async fn remove(&self, _partition: &str, _name: Option<&[u8]>) -> Result<(), RError> {
1984 Ok(())
1985 }
1986
1987 async fn scan(&self, _partition: &str) -> Result<Vec<Vec<u8>>, RError> {
1988 Ok(vec![])
1989 }
1990 }
1991
1992 impl Metrics for MockStorage {
1993 fn with_label(&self, _: &str) -> Self {
1994 self.clone()
1995 }
1996
1997 fn label(&self) -> String {
1998 String::new()
1999 }
2000
2001 fn register<N: Into<String>, H: Into<String>>(&self, _: N, _: H, _: impl Metric) {}
2002
2003 fn encode(&self) -> String {
2004 String::new()
2005 }
2006 }
2007
2008 const INDEX_ALIGNMENT: u64 = 16;
2011
2012 #[test_traced]
2013 fn test_journal_large_offset() {
2014 let executor = deterministic::Runner::default();
2016 executor.start(|_| async move {
2017 let cfg = Config {
2019 partition: "partition".to_string(),
2020 compression: None,
2021 codec_config: (),
2022 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2023 write_buffer: NZUsize!(1024),
2024 };
2025 let context = MockStorage {
2026 len: u32::MAX as u64 * INDEX_ALIGNMENT, };
2028 let mut journal = Journal::init(context, cfg).await.unwrap();
2029
2030 let data = 1;
2032 let (result, _) = journal
2033 .append(1, data)
2034 .await
2035 .expect("Failed to append data");
2036 assert_eq!(result, u32::MAX);
2037 });
2038 }
2039
2040 #[test_traced]
2041 fn test_journal_offset_overflow() {
2042 let executor = deterministic::Runner::default();
2044 executor.start(|_| async move {
2045 let cfg = Config {
2047 partition: "partition".to_string(),
2048 compression: None,
2049 codec_config: (),
2050 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2051 write_buffer: NZUsize!(1024),
2052 };
2053 let context = MockStorage {
2054 len: u32::MAX as u64 * INDEX_ALIGNMENT + 1,
2055 };
2056 let mut journal = Journal::init(context, cfg).await.unwrap();
2057
2058 let data = 1;
2060 let result = journal.append(1, data).await;
2061 assert!(matches!(result, Err(Error::OffsetOverflow)));
2062 });
2063 }
2064
2065 #[test_traced]
2066 fn test_journal_rewind() {
2067 let executor = deterministic::Runner::default();
2069 executor.start(|context| async move {
2070 let cfg = Config {
2072 partition: "test_partition".to_string(),
2073 compression: None,
2074 codec_config: (),
2075 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2076 write_buffer: NZUsize!(1024),
2077 };
2078 let mut journal = Journal::init(context, cfg).await.unwrap();
2079
2080 let size = journal.size(1).await.unwrap();
2082 assert_eq!(size, 0);
2083
2084 journal.append(1, 42i32).await.unwrap();
2086
2087 let size = journal.size(1).await.unwrap();
2089 assert!(size > 0);
2090
2091 journal.append(1, 43i32).await.unwrap();
2093 let new_size = journal.size(1).await.unwrap();
2094 assert!(new_size > size);
2095
2096 let size = journal.size(2).await.unwrap();
2098 assert_eq!(size, 0);
2099
2100 journal.append(2, 44i32).await.unwrap();
2102
2103 let size = journal.size(2).await.unwrap();
2105 assert!(size > 0);
2106
2107 journal.rewind(1, 0).await.unwrap();
2109
2110 let size = journal.size(1).await.unwrap();
2112 assert_eq!(size, 0);
2113
2114 let size = journal.size(2).await.unwrap();
2116 assert_eq!(size, 0);
2117 });
2118 }
2119
2120 #[test_traced]
2121 fn test_journal_rewind_section() {
2122 let executor = deterministic::Runner::default();
2124 executor.start(|context| async move {
2125 let cfg = Config {
2127 partition: "test_partition".to_string(),
2128 compression: None,
2129 codec_config: (),
2130 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2131 write_buffer: NZUsize!(1024),
2132 };
2133 let mut journal = Journal::init(context, cfg).await.unwrap();
2134
2135 let size = journal.size(1).await.unwrap();
2137 assert_eq!(size, 0);
2138
2139 journal.append(1, 42i32).await.unwrap();
2141
2142 let size = journal.size(1).await.unwrap();
2144 assert!(size > 0);
2145
2146 journal.append(1, 43i32).await.unwrap();
2148 let new_size = journal.size(1).await.unwrap();
2149 assert!(new_size > size);
2150
2151 let size = journal.size(2).await.unwrap();
2153 assert_eq!(size, 0);
2154
2155 journal.append(2, 44i32).await.unwrap();
2157
2158 let size = journal.size(2).await.unwrap();
2160 assert!(size > 0);
2161
2162 journal.rewind_section(1, 0).await.unwrap();
2164
2165 let size = journal.size(1).await.unwrap();
2167 assert_eq!(size, 0);
2168
2169 let size = journal.size(2).await.unwrap();
2171 assert!(size > 0);
2172 });
2173 }
2174
2175 #[test_traced]
2177 fn test_journal_conformance() {
2178 let executor = deterministic::Runner::default();
2180
2181 executor.start(|context| async move {
2183 let cfg = Config {
2185 partition: "test_partition".into(),
2186 compression: None,
2187 codec_config: (),
2188 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2189 write_buffer: NZUsize!(1024),
2190 };
2191
2192 let mut journal = Journal::init(context.clone(), cfg.clone())
2194 .await
2195 .expect("Failed to initialize journal");
2196
2197 for i in 0..100 {
2199 journal.append(1, i).await.expect("Failed to append data");
2200 }
2201 journal.sync(1).await.expect("Failed to sync blob");
2202
2203 journal.close().await.expect("Failed to close journal");
2205
2206 let (blob, size) = context
2208 .open(&cfg.partition, &1u64.to_be_bytes())
2209 .await
2210 .expect("Failed to open blob");
2211 assert!(size > 0);
2212 let buf = blob
2213 .read_at(vec![0u8; size as usize], 0)
2214 .await
2215 .expect("Failed to read blob");
2216 let digest = Sha256::hash(buf.as_ref());
2217 assert_eq!(
2218 hex(&digest),
2219 "ca3845fa7fabd4d2855ab72ed21226d1d6eb30cb895ea9ec5e5a14201f3f25d8",
2220 );
2221 });
2222 }
2223}