1use super::Error;
104use bytes::BufMut;
105use commonware_codec::Codec;
106use commonware_runtime::{
107 buffer::{Append, PoolRef, Read},
108 Blob, Error as RError, Metrics, Storage,
109};
110use commonware_utils::hex;
111use futures::stream::{self, Stream, StreamExt};
112use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
113use std::{
114 collections::{btree_map::Entry, BTreeMap},
115 io::Cursor,
116 marker::PhantomData,
117 num::NonZeroUsize,
118};
119use tracing::{debug, trace, warn};
120use zstd::{bulk::compress, decode_all};
121
122#[derive(Clone)]
124pub struct Config<C> {
125 pub partition: String,
128
129 pub compression: Option<u8>,
131
132 pub codec_config: C,
134
135 pub buffer_pool: PoolRef,
137
138 pub write_buffer: NonZeroUsize,
140}
141
142pub(crate) const ITEM_ALIGNMENT: u64 = 16;
143
144#[inline]
147fn compute_next_offset(mut offset: u64) -> Result<u32, Error> {
148 let overage = offset % ITEM_ALIGNMENT;
149 if overage != 0 {
150 offset += ITEM_ALIGNMENT - overage;
151 }
152 let offset = offset / ITEM_ALIGNMENT;
153 let aligned_offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
154 Ok(aligned_offset)
155}
156
157pub struct Journal<E: Storage + Metrics, V: Codec> {
159 pub(crate) context: E,
160 pub(crate) cfg: Config<V::Cfg>,
161
162 pub(crate) blobs: BTreeMap<u64, Append<E::Blob>>,
163
164 pub(crate) oldest_retained_section: u64,
168
169 pub(crate) tracked: Gauge,
170 pub(crate) synced: Counter,
171 pub(crate) pruned: Counter,
172
173 pub(crate) _phantom: PhantomData<V>,
174}
175
176impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
177 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
183 let mut blobs = BTreeMap::new();
185 let stored_blobs = match context.scan(&cfg.partition).await {
186 Ok(blobs) => blobs,
187 Err(RError::PartitionMissing(_)) => Vec::new(),
188 Err(err) => return Err(Error::Runtime(err)),
189 };
190 for name in stored_blobs {
191 let (blob, size) = context.open(&cfg.partition, &name).await?;
192 let hex_name = hex(&name);
193 let section = match name.try_into() {
194 Ok(section) => u64::from_be_bytes(section),
195 Err(_) => return Err(Error::InvalidBlobName(hex_name)),
196 };
197 debug!(section, blob = hex_name, size, "loaded section");
198 let blob = Append::new(blob, size, cfg.write_buffer, cfg.buffer_pool.clone()).await?;
199 blobs.insert(section, blob);
200 }
201
202 let tracked = Gauge::default();
204 let synced = Counter::default();
205 let pruned = Counter::default();
206 context.register("tracked", "Number of blobs", tracked.clone());
207 context.register("synced", "Number of syncs", synced.clone());
208 context.register("pruned", "Number of blobs pruned", pruned.clone());
209 tracked.set(blobs.len() as i64);
210
211 Ok(Self {
213 context,
214 cfg,
215 blobs,
216 oldest_retained_section: 0,
217 tracked,
218 synced,
219 pruned,
220
221 _phantom: PhantomData,
222 })
223 }
224
225 fn prune_guard(&self, section: u64) -> Result<(), Error> {
227 if section < self.oldest_retained_section {
228 Err(Error::AlreadyPrunedToSection(self.oldest_retained_section))
229 } else {
230 Ok(())
231 }
232 }
233
234 pub(crate) async fn read(
236 compressed: bool,
237 cfg: &V::Cfg,
238 blob: &Append<E::Blob>,
239 offset: u32,
240 ) -> Result<(u32, u32, V), Error> {
241 let mut hasher = crc32fast::Hasher::new();
243 let offset = offset as u64 * ITEM_ALIGNMENT;
244 let size = blob.read_at(vec![0; 4], offset).await?;
245 hasher.update(size.as_ref());
246 let size = u32::from_be_bytes(size.as_ref().try_into().unwrap()) as usize;
247 let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
248
249 let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
251 let buf = blob.read_at(vec![0u8; buf_size], offset).await?;
252 let buf = buf.as_ref();
253 let offset = offset
254 .checked_add(buf_size as u64)
255 .ok_or(Error::OffsetOverflow)?;
256
257 let item = &buf[..size];
259 hasher.update(item);
260
261 let checksum = hasher.finalize();
263 let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
264 if checksum != stored_checksum {
265 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
266 }
267
268 let aligned_offset = compute_next_offset(offset)?;
270
271 let item = if compressed {
273 let decompressed =
274 decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
275 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
276 } else {
277 V::decode_cfg(item, cfg).map_err(Error::Codec)?
278 };
279
280 Ok((aligned_offset, size as u32, item))
282 }
283
284 async fn read_buffered(
286 reader: &mut Read<Append<E::Blob>>,
287 offset: u32,
288 cfg: &V::Cfg,
289 compressed: bool,
290 ) -> Result<(u32, u64, u32, V), Error> {
291 let file_offset = offset as u64 * ITEM_ALIGNMENT;
293
294 if reader.position() != file_offset {
296 reader.seek_to(file_offset).map_err(Error::Runtime)?;
297 }
298
299 let mut hasher = crc32fast::Hasher::new();
301 let mut size_buf = [0u8; 4];
302 reader
303 .read_exact(&mut size_buf, 4)
304 .await
305 .map_err(Error::Runtime)?;
306 hasher.update(&size_buf);
307
308 let size = u32::from_be_bytes(size_buf) as usize;
310 let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
311 let mut buf = vec![0u8; buf_size];
312 reader
313 .read_exact(&mut buf, buf_size)
314 .await
315 .map_err(Error::Runtime)?;
316
317 let item = &buf[..size];
319 hasher.update(item);
320
321 let checksum = hasher.finalize();
323 let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
324 if checksum != stored_checksum {
325 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
326 }
327
328 let item = if compressed {
330 let decompressed =
331 decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
332 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
333 } else {
334 V::decode_cfg(item, cfg).map_err(Error::Codec)?
335 };
336
337 let current_pos = reader.position();
339 let aligned_offset = compute_next_offset(current_pos)?;
340 Ok((aligned_offset, current_pos, size as u32, item))
341 }
342
343 async fn read_exact(
345 compressed: bool,
346 cfg: &V::Cfg,
347 blob: &Append<E::Blob>,
348 offset: u32,
349 len: u32,
350 ) -> Result<V, Error> {
351 let offset = offset as u64 * ITEM_ALIGNMENT;
353 let entry_size = 4 + len as usize + 4;
354 let buf = blob.read_at(vec![0u8; entry_size], offset).await?;
355
356 let mut hasher = crc32fast::Hasher::new();
358 let disk_size = u32::from_be_bytes(buf.as_ref()[..4].try_into().unwrap());
359 hasher.update(&buf.as_ref()[..4]);
360 if disk_size != len {
361 return Err(Error::UnexpectedSize(disk_size, len));
362 }
363
364 let item = &buf.as_ref()[4..4 + len as usize];
366 hasher.update(item);
367 let checksum = hasher.finalize();
368 let stored_checksum =
369 u32::from_be_bytes(buf.as_ref()[4 + len as usize..].try_into().unwrap());
370 if checksum != stored_checksum {
371 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
372 }
373
374 let item = if compressed {
376 decode_all(Cursor::new(item)).map_err(|_| Error::DecompressionFailed)?
377 } else {
378 item.to_vec()
379 };
380
381 let item = V::decode_cfg(item.as_ref(), cfg).map_err(Error::Codec)?;
383 Ok(item)
384 }
385
386 pub async fn replay(
399 &self,
400 start_section: u64,
401 mut offset: u32,
402 buffer: NonZeroUsize,
403 ) -> Result<impl Stream<Item = Result<(u64, u32, u32, V), Error>> + '_, Error> {
404 let codec_config = self.cfg.codec_config.clone();
406 let compressed = self.cfg.compression.is_some();
407 let mut blobs = Vec::with_capacity(self.blobs.len());
408 for (section, blob) in self.blobs.range(start_section..) {
409 let blob_size = blob.size().await;
410 let max_offset = compute_next_offset(blob_size)?;
411 blobs.push((
412 *section,
413 blob.clone(),
414 max_offset,
415 blob_size,
416 codec_config.clone(),
417 compressed,
418 ));
419 }
420
421 Ok(stream::iter(blobs).flat_map(
424 move |(section, blob, max_offset, blob_size, codec_config, compressed)| {
425 let mut reader = Read::new(blob, blob_size, buffer);
427 if section == start_section && offset != 0 {
428 if let Err(err) = reader.seek_to(offset as u64 * ITEM_ALIGNMENT) {
429 warn!(section, offset, ?err, "failed to seek to offset");
430 return stream::once(async move { Err(err.into()) }).left_stream();
432 }
433 } else {
434 offset = 0;
435 }
436
437 stream::unfold(
439 (section, reader, offset, 0u64, codec_config, compressed),
440 move |(
441 section,
442 mut reader,
443 offset,
444 valid_size,
445 codec_config,
446 compressed,
447 )| async move {
448 if offset >= max_offset {
450 return None;
451 }
452
453 match Self::read_buffered(
455 &mut reader,
456 offset,
457 &codec_config,
458 compressed,
459 )
460 .await
461 {
462 Ok((next_offset, next_valid_size, size, item)) => {
463 trace!(blob = section, cursor = offset, "replayed item");
464 Some((
465 Ok((section, offset, size, item)),
466 (
467 section,
468 reader,
469 next_offset,
470 next_valid_size,
471 codec_config,
472 compressed,
473 ),
474 ))
475 }
476 Err(Error::ChecksumMismatch(expected, found)) => {
477 warn!(
481 blob = section,
482 bad_offset = offset,
483 new_size = valid_size,
484 expected,
485 found,
486 "corruption detected: truncating"
487 );
488 reader.resize(valid_size).await.ok()?;
489 None
490 }
491 Err(Error::Runtime(RError::BlobInsufficientLength)) => {
492 warn!(
496 blob = section,
497 bad_offset = offset,
498 new_size = valid_size,
499 "trailing bytes detected: truncating"
500 );
501 reader.resize(valid_size).await.ok()?;
502 None
503 }
504 Err(err) => {
505 warn!(
508 blob = section,
509 cursor = offset,
510 ?err,
511 "unexpected error"
512 );
513 Some((
514 Err(err),
515 (
516 section,
517 reader,
518 offset,
519 valid_size,
520 codec_config,
521 compressed,
522 ),
523 ))
524 }
525 }
526 },
527 ).right_stream()
528 },
529 ))
530 }
531
532 pub async fn append(&mut self, section: u64, item: V) -> Result<(u32, u32), Error> {
544 self.prune_guard(section)?;
546
547 let encoded = item.encode();
549 let encoded = if let Some(compression) = self.cfg.compression {
550 compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?
551 } else {
552 encoded.into()
553 };
554
555 let item_len = encoded.len();
557 let entry_len = 4 + item_len + 4;
558 let item_len = match item_len.try_into() {
559 Ok(len) => len,
560 Err(_) => return Err(Error::ItemTooLarge(item_len)),
561 };
562
563 let blob = match self.blobs.entry(section) {
565 Entry::Occupied(entry) => entry.into_mut(),
566 Entry::Vacant(entry) => {
567 let name = section.to_be_bytes();
568 let (blob, size) = self.context.open(&self.cfg.partition, &name).await?;
569 let blob = Append::new(
570 blob,
571 size,
572 self.cfg.write_buffer,
573 self.cfg.buffer_pool.clone(),
574 )
575 .await?;
576 self.tracked.inc();
577 entry.insert(blob)
578 }
579 };
580
581 let cursor = blob.size().await;
583 let offset = compute_next_offset(cursor)?;
584 let aligned_cursor = offset as u64 * ITEM_ALIGNMENT;
585 let padding = (aligned_cursor - cursor) as usize;
586
587 let mut buf = Vec::with_capacity(padding + entry_len);
589
590 if padding > 0 {
592 buf.resize(padding, 0);
593 }
594
595 let entry_start = buf.len();
597 buf.put_u32(item_len);
598 buf.put_slice(&encoded);
599
600 let checksum = crc32fast::hash(&buf[entry_start..]);
602 buf.put_u32(checksum);
603 assert_eq!(buf[entry_start..].len(), entry_len);
604
605 blob.append(buf).await?;
607 trace!(blob = section, offset, "appended item");
608 Ok((offset, item_len))
609 }
610
611 pub async fn get(&self, section: u64, offset: u32) -> Result<V, Error> {
622 self.prune_guard(section)?;
623 let blob = match self.blobs.get(§ion) {
624 Some(blob) => blob,
625 None => return Err(Error::SectionOutOfRange(section)),
626 };
627
628 let (_, _, item) = Self::read(
630 self.cfg.compression.is_some(),
631 &self.cfg.codec_config,
632 blob,
633 offset,
634 )
635 .await?;
636 Ok(item)
637 }
638
639 pub async fn get_exact(&self, section: u64, offset: u32, size: u32) -> Result<V, Error> {
641 self.prune_guard(section)?;
642 let blob = match self.blobs.get(§ion) {
643 Some(blob) => blob,
644 None => return Err(Error::SectionOutOfRange(section)),
645 };
646
647 let item = Self::read_exact(
649 self.cfg.compression.is_some(),
650 &self.cfg.codec_config,
651 blob,
652 offset,
653 size,
654 )
655 .await?;
656 Ok(item)
657 }
658
659 pub async fn size(&self, section: u64) -> Result<u64, Error> {
663 self.prune_guard(section)?;
664 match self.blobs.get(§ion) {
665 Some(blob) => Ok(blob.size().await),
666 None => Ok(0),
667 }
668 }
669
670 pub async fn rewind_to_offset(&mut self, section: u64, offset: u32) -> Result<(), Error> {
678 self.rewind(section, offset as u64 * ITEM_ALIGNMENT).await
679 }
680
681 pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
691 self.prune_guard(section)?;
692
693 let trailing: Vec<u64> = self
695 .blobs
696 .range((
697 std::ops::Bound::Excluded(section),
698 std::ops::Bound::Unbounded,
699 ))
700 .map(|(§ion, _)| section)
701 .collect();
702 for index in trailing.iter().rev() {
703 let blob = self.blobs.remove(index).unwrap();
705
706 drop(blob);
708 self.context
709 .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
710 .await?;
711 debug!(section = index, "removed section");
712 self.tracked.dec();
713 }
714
715 let blob = match self.blobs.get_mut(§ion) {
717 Some(blob) => blob,
718 None => return Ok(()),
719 };
720 let current = blob.size().await;
721 if size >= current {
722 return Ok(()); }
724 blob.resize(size).await?;
725 debug!(
726 section,
727 from = current,
728 to = size,
729 ?trailing,
730 "rewound journal"
731 );
732 Ok(())
733 }
734
735 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
743 self.prune_guard(section)?;
744
745 let blob = match self.blobs.get_mut(§ion) {
747 Some(blob) => blob,
748 None => return Ok(()),
749 };
750
751 let current = blob.size().await;
753 if size >= current {
754 return Ok(()); }
756 blob.resize(size).await?;
757 debug!(section, from = current, to = size, "rewound section");
758 Ok(())
759 }
760
761 pub async fn sync(&self, section: u64) -> Result<(), Error> {
765 self.prune_guard(section)?;
766 let blob = match self.blobs.get(§ion) {
767 Some(blob) => blob,
768 None => return Ok(()),
769 };
770 self.synced.inc();
771 blob.sync().await.map_err(Error::Runtime)
772 }
773
774 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
776 let mut pruned = false;
778 while let Some((§ion, _)) = self.blobs.first_key_value() {
779 if section >= min {
781 break;
782 }
783
784 let blob = self.blobs.remove(§ion).unwrap();
786 let size = blob.size().await;
787 drop(blob);
788
789 self.context
791 .remove(&self.cfg.partition, Some(§ion.to_be_bytes()))
792 .await?;
793 pruned = true;
794
795 debug!(blob = section, size, "pruned blob");
796 self.tracked.dec();
797 self.pruned.inc();
798 }
799
800 Ok(pruned)
801 }
802
803 pub async fn close(self) -> Result<(), Error> {
805 for (section, blob) in self.blobs.into_iter() {
806 let size = blob.size().await;
807 blob.sync().await?;
808 debug!(blob = section, size, "synced blob");
809 }
810 Ok(())
811 }
812
813 pub fn oldest_section(&self) -> Option<u64> {
815 self.blobs.first_key_value().map(|(section, _)| *section)
816 }
817
818 pub async fn destroy(self) -> Result<(), Error> {
820 for (i, blob) in self.blobs.into_iter() {
821 let size = blob.size().await;
822 drop(blob);
823 debug!(blob = i, size, "destroyed blob");
824 self.context
825 .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
826 .await?;
827 }
828 match self.context.remove(&self.cfg.partition, None).await {
829 Ok(()) => {}
830 Err(RError::PartitionMissing(_)) => {
831 }
833 Err(err) => return Err(Error::Runtime(err)),
834 }
835 Ok(())
836 }
837}
838
839#[cfg(test)]
840mod tests {
841 use super::*;
842 use bytes::BufMut;
843 use commonware_cryptography::{Hasher, Sha256};
844 use commonware_macros::test_traced;
845 use commonware_runtime::{deterministic, Blob, Error as RError, Runner, Storage};
846 use commonware_utils::{NZUsize, StableBuf};
847 use futures::{pin_mut, StreamExt};
848 use prometheus_client::registry::Metric;
849
850 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
851 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
852
853 #[test_traced]
854 fn test_journal_append_and_read() {
855 let executor = deterministic::Runner::default();
857
858 executor.start(|context| async move {
860 let cfg = Config {
862 partition: "test_partition".into(),
863 compression: None,
864 codec_config: (),
865 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
866 write_buffer: NZUsize!(1024),
867 };
868 let index = 1u64;
869 let data = 10;
870 let mut journal = Journal::init(context.clone(), cfg.clone())
871 .await
872 .expect("Failed to initialize journal");
873
874 journal
876 .append(index, data)
877 .await
878 .expect("Failed to append data");
879
880 let buffer = context.encode();
882 assert!(buffer.contains("tracked 1"));
883
884 journal.close().await.expect("Failed to close journal");
886
887 let cfg = Config {
889 partition: "test_partition".into(),
890 compression: None,
891 codec_config: (),
892 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
893 write_buffer: NZUsize!(1024),
894 };
895 let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
896 .await
897 .expect("Failed to re-initialize journal");
898
899 let mut items = Vec::new();
901 let stream = journal
902 .replay(0, 0, NZUsize!(1024))
903 .await
904 .expect("unable to setup replay");
905 pin_mut!(stream);
906 while let Some(result) = stream.next().await {
907 match result {
908 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
909 Err(err) => panic!("Failed to read item: {err}"),
910 }
911 }
912
913 assert_eq!(items.len(), 1);
915 assert_eq!(items[0].0, index);
916 assert_eq!(items[0].1, data);
917
918 let buffer = context.encode();
920 assert!(buffer.contains("tracked 1"));
921 });
922 }
923
924 #[test_traced]
925 fn test_journal_multiple_appends_and_reads() {
926 let executor = deterministic::Runner::default();
928
929 executor.start(|context| async move {
931 let cfg = Config {
933 partition: "test_partition".into(),
934 compression: None,
935 codec_config: (),
936 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
937 write_buffer: NZUsize!(1024),
938 };
939
940 let mut journal = Journal::init(context.clone(), cfg.clone())
942 .await
943 .expect("Failed to initialize journal");
944
945 let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
947 for (index, data) in &data_items {
948 journal
949 .append(*index, *data)
950 .await
951 .expect("Failed to append data");
952 journal.sync(*index).await.expect("Failed to sync blob");
953 }
954
955 let buffer = context.encode();
957 assert!(buffer.contains("tracked 3"));
958 assert!(buffer.contains("synced_total 4"));
959
960 journal.close().await.expect("Failed to close journal");
962
963 let journal = Journal::init(context, cfg)
965 .await
966 .expect("Failed to re-initialize journal");
967
968 let mut items = Vec::<(u64, u32)>::new();
970 {
971 let stream = journal
972 .replay(0, 0, NZUsize!(1024))
973 .await
974 .expect("unable to setup replay");
975 pin_mut!(stream);
976 while let Some(result) = stream.next().await {
977 match result {
978 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
979 Err(err) => panic!("Failed to read item: {err}"),
980 }
981 }
982 }
983
984 assert_eq!(items.len(), data_items.len());
986 for ((expected_index, expected_data), (actual_index, actual_data)) in
987 data_items.iter().zip(items.iter())
988 {
989 assert_eq!(actual_index, expected_index);
990 assert_eq!(actual_data, expected_data);
991 }
992
993 journal.destroy().await.expect("Failed to destroy journal");
995 });
996 }
997
998 #[test_traced]
999 fn test_journal_prune_blobs() {
1000 let executor = deterministic::Runner::default();
1002
1003 executor.start(|context| async move {
1005 let cfg = Config {
1007 partition: "test_partition".into(),
1008 compression: None,
1009 codec_config: (),
1010 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1011 write_buffer: NZUsize!(1024),
1012 };
1013
1014 let mut journal = Journal::init(context.clone(), cfg.clone())
1016 .await
1017 .expect("Failed to initialize journal");
1018
1019 for index in 1u64..=5u64 {
1021 journal
1022 .append(index, index)
1023 .await
1024 .expect("Failed to append data");
1025 journal.sync(index).await.expect("Failed to sync blob");
1026 }
1027
1028 let data = 99;
1030 journal
1031 .append(2u64, data)
1032 .await
1033 .expect("Failed to append data");
1034 journal.sync(2u64).await.expect("Failed to sync blob");
1035
1036 journal.prune(3).await.expect("Failed to prune blobs");
1038
1039 let buffer = context.encode();
1041 assert!(buffer.contains("pruned_total 2"));
1042
1043 journal.prune(2).await.expect("Failed to no-op prune");
1045 let buffer = context.encode();
1046 assert!(buffer.contains("pruned_total 2"));
1047
1048 journal.close().await.expect("Failed to close journal");
1050
1051 let mut journal = Journal::init(context.clone(), cfg.clone())
1053 .await
1054 .expect("Failed to re-initialize journal");
1055
1056 let mut items = Vec::<(u64, u64)>::new();
1058 {
1059 let stream = journal
1060 .replay(0, 0, NZUsize!(1024))
1061 .await
1062 .expect("unable to setup replay");
1063 pin_mut!(stream);
1064 while let Some(result) = stream.next().await {
1065 match result {
1066 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1067 Err(err) => panic!("Failed to read item: {err}"),
1068 }
1069 }
1070 }
1071
1072 assert_eq!(items.len(), 3);
1074 let expected_indices = [3u64, 4u64, 5u64];
1075 for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
1076 assert_eq!(item.0, *expected_index);
1077 }
1078
1079 journal.prune(6).await.expect("Failed to prune blobs");
1081
1082 journal.close().await.expect("Failed to close journal");
1084
1085 assert!(context
1090 .scan(&cfg.partition)
1091 .await
1092 .expect("Failed to list blobs")
1093 .is_empty());
1094 });
1095 }
1096
1097 #[test_traced]
1098 fn test_journal_with_invalid_blob_name() {
1099 let executor = deterministic::Runner::default();
1101
1102 executor.start(|context| async move {
1104 let cfg = Config {
1106 partition: "test_partition".into(),
1107 compression: None,
1108 codec_config: (),
1109 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1110 write_buffer: NZUsize!(1024),
1111 };
1112
1113 let invalid_blob_name = b"invalid"; let (blob, _) = context
1116 .open(&cfg.partition, invalid_blob_name)
1117 .await
1118 .expect("Failed to create blob with invalid name");
1119 blob.sync().await.expect("Failed to sync blob");
1120
1121 let result = Journal::<_, u64>::init(context, cfg).await;
1123
1124 assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1126 });
1127 }
1128
1129 #[test_traced]
1130 fn test_journal_read_size_missing() {
1131 let executor = deterministic::Runner::default();
1133
1134 executor.start(|context| async move {
1136 let cfg = Config {
1138 partition: "test_partition".into(),
1139 compression: None,
1140 codec_config: (),
1141 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1142 write_buffer: NZUsize!(1024),
1143 };
1144
1145 let section = 1u64;
1147 let blob_name = section.to_be_bytes();
1148 let (blob, _) = context
1149 .open(&cfg.partition, &blob_name)
1150 .await
1151 .expect("Failed to create blob");
1152
1153 let incomplete_data = vec![0x00, 0x01]; blob.write_at(incomplete_data, 0)
1156 .await
1157 .expect("Failed to write incomplete data");
1158 blob.sync().await.expect("Failed to sync blob");
1159
1160 let journal = Journal::init(context, cfg)
1162 .await
1163 .expect("Failed to initialize journal");
1164
1165 let stream = journal
1167 .replay(0, 0, NZUsize!(1024))
1168 .await
1169 .expect("unable to setup replay");
1170 pin_mut!(stream);
1171 let mut items = Vec::<(u64, u64)>::new();
1172 while let Some(result) = stream.next().await {
1173 match result {
1174 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1175 Err(err) => panic!("Failed to read item: {err}"),
1176 }
1177 }
1178 assert!(items.is_empty());
1179 });
1180 }
1181
1182 #[test_traced]
1183 fn test_journal_read_item_missing() {
1184 let executor = deterministic::Runner::default();
1186
1187 executor.start(|context| async move {
1189 let cfg = Config {
1191 partition: "test_partition".into(),
1192 compression: None,
1193 codec_config: (),
1194 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1195 write_buffer: NZUsize!(1024),
1196 };
1197
1198 let section = 1u64;
1200 let blob_name = section.to_be_bytes();
1201 let (blob, _) = context
1202 .open(&cfg.partition, &blob_name)
1203 .await
1204 .expect("Failed to create blob");
1205
1206 let item_size: u32 = 10; let mut buf = Vec::new();
1209 buf.put_u32(item_size);
1210 let data = [2u8; 5];
1211 BufMut::put_slice(&mut buf, &data);
1212 blob.write_at(buf, 0)
1213 .await
1214 .expect("Failed to write item size");
1215 blob.sync().await.expect("Failed to sync blob");
1216
1217 let journal = Journal::init(context, cfg)
1219 .await
1220 .expect("Failed to initialize journal");
1221
1222 let stream = journal
1224 .replay(0, 0, NZUsize!(1024))
1225 .await
1226 .expect("unable to setup replay");
1227 pin_mut!(stream);
1228 let mut items = Vec::<(u64, u64)>::new();
1229 while let Some(result) = stream.next().await {
1230 match result {
1231 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1232 Err(err) => panic!("Failed to read item: {err}"),
1233 }
1234 }
1235 assert!(items.is_empty());
1236 });
1237 }
1238
1239 #[test_traced]
1240 fn test_journal_read_checksum_missing() {
1241 let executor = deterministic::Runner::default();
1243
1244 executor.start(|context| async move {
1246 let cfg = Config {
1248 partition: "test_partition".into(),
1249 compression: None,
1250 codec_config: (),
1251 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1252 write_buffer: NZUsize!(1024),
1253 };
1254
1255 let section = 1u64;
1257 let blob_name = section.to_be_bytes();
1258 let (blob, _) = context
1259 .open(&cfg.partition, &blob_name)
1260 .await
1261 .expect("Failed to create blob");
1262
1263 let item_data = b"Test data";
1265 let item_size = item_data.len() as u32;
1266
1267 let mut offset = 0;
1269 blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1270 .await
1271 .expect("Failed to write item size");
1272 offset += 4;
1273
1274 blob.write_at(item_data.to_vec(), offset)
1276 .await
1277 .expect("Failed to write item data");
1278 blob.sync().await.expect("Failed to sync blob");
1281
1282 let journal = Journal::init(context, cfg)
1284 .await
1285 .expect("Failed to initialize journal");
1286
1287 let stream = journal
1291 .replay(0, 0, NZUsize!(1024))
1292 .await
1293 .expect("unable to setup replay");
1294 pin_mut!(stream);
1295 let mut items = Vec::<(u64, u64)>::new();
1296 while let Some(result) = stream.next().await {
1297 match result {
1298 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1299 Err(err) => panic!("Failed to read item: {err}"),
1300 }
1301 }
1302 assert!(items.is_empty());
1303 });
1304 }
1305
1306 #[test_traced]
1307 fn test_journal_read_checksum_mismatch() {
1308 let executor = deterministic::Runner::default();
1310
1311 executor.start(|context| async move {
1313 let cfg = Config {
1315 partition: "test_partition".into(),
1316 compression: None,
1317 codec_config: (),
1318 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1319 write_buffer: NZUsize!(1024),
1320 };
1321
1322 let section = 1u64;
1324 let blob_name = section.to_be_bytes();
1325 let (blob, _) = context
1326 .open(&cfg.partition, &blob_name)
1327 .await
1328 .expect("Failed to create blob");
1329
1330 let item_data = b"Test data";
1332 let item_size = item_data.len() as u32;
1333 let incorrect_checksum: u32 = 0xDEADBEEF;
1334
1335 let mut offset = 0;
1337 blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1338 .await
1339 .expect("Failed to write item size");
1340 offset += 4;
1341
1342 blob.write_at(item_data.to_vec(), offset)
1344 .await
1345 .expect("Failed to write item data");
1346 offset += item_data.len() as u64;
1347
1348 blob.write_at(incorrect_checksum.to_be_bytes().to_vec(), offset)
1350 .await
1351 .expect("Failed to write incorrect checksum");
1352
1353 blob.sync().await.expect("Failed to sync blob");
1354
1355 let journal = Journal::init(context.clone(), cfg.clone())
1357 .await
1358 .expect("Failed to initialize journal");
1359
1360 {
1362 let stream = journal
1363 .replay(0, 0, NZUsize!(1024))
1364 .await
1365 .expect("unable to setup replay");
1366 pin_mut!(stream);
1367 let mut items = Vec::<(u64, u64)>::new();
1368 while let Some(result) = stream.next().await {
1369 match result {
1370 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1371 Err(err) => panic!("Failed to read item: {err}"),
1372 }
1373 }
1374 assert!(items.is_empty());
1375 }
1376 journal.close().await.expect("Failed to close journal");
1377
1378 let (_, blob_size) = context
1380 .open(&cfg.partition, §ion.to_be_bytes())
1381 .await
1382 .expect("Failed to open blob");
1383 assert_eq!(blob_size, 0);
1384 });
1385 }
1386
1387 #[test_traced]
1388 fn test_journal_handling_unaligned_truncated_data() {
1389 let executor = deterministic::Runner::default();
1391
1392 executor.start(|context| async move {
1394 let cfg = Config {
1396 partition: "test_partition".into(),
1397 compression: None,
1398 codec_config: (),
1399 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1400 write_buffer: NZUsize!(1024),
1401 };
1402
1403 let mut journal = Journal::init(context.clone(), cfg.clone())
1405 .await
1406 .expect("Failed to initialize journal");
1407
1408 journal.append(1, 1).await.expect("Failed to append data");
1410
1411 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1413 for (index, data) in &data_items {
1414 journal
1415 .append(*index, *data)
1416 .await
1417 .expect("Failed to append data");
1418 journal.sync(*index).await.expect("Failed to sync blob");
1419 }
1420
1421 journal.close().await.expect("Failed to close journal");
1423
1424 let (blob, blob_size) = context
1426 .open(&cfg.partition, &2u64.to_be_bytes())
1427 .await
1428 .expect("Failed to open blob");
1429 blob.resize(blob_size - 4)
1430 .await
1431 .expect("Failed to corrupt blob");
1432 blob.sync().await.expect("Failed to sync blob");
1433
1434 let journal = Journal::init(context.clone(), cfg.clone())
1436 .await
1437 .expect("Failed to re-initialize journal");
1438
1439 let mut items = Vec::<(u64, u32)>::new();
1441 {
1442 let stream = journal
1443 .replay(0, 0, NZUsize!(1024))
1444 .await
1445 .expect("unable to setup replay");
1446 pin_mut!(stream);
1447 while let Some(result) = stream.next().await {
1448 match result {
1449 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1450 Err(err) => panic!("Failed to read item: {err}"),
1451 }
1452 }
1453 }
1454 journal.close().await.expect("Failed to close journal");
1455
1456 assert_eq!(items.len(), 3);
1458 assert_eq!(items[0].0, 1);
1459 assert_eq!(items[0].1, 1);
1460 assert_eq!(items[1].0, data_items[0].0);
1461 assert_eq!(items[1].1, data_items[0].1);
1462 assert_eq!(items[2].0, data_items[1].0);
1463 assert_eq!(items[2].1, data_items[1].1);
1464
1465 let (_, blob_size) = context
1467 .open(&cfg.partition, &2u64.to_be_bytes())
1468 .await
1469 .expect("Failed to open blob");
1470 assert_eq!(blob_size, 28);
1471
1472 let mut journal = Journal::init(context.clone(), cfg.clone())
1474 .await
1475 .expect("Failed to re-initialize journal");
1476
1477 let mut items = Vec::<(u64, u32)>::new();
1479 {
1480 let stream = journal
1481 .replay(0, 0, NZUsize!(1024))
1482 .await
1483 .expect("unable to setup replay");
1484 pin_mut!(stream);
1485 while let Some(result) = stream.next().await {
1486 match result {
1487 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1488 Err(err) => panic!("Failed to read item: {err}"),
1489 }
1490 }
1491 }
1492
1493 assert_eq!(items.len(), 3);
1495 assert_eq!(items[0].0, 1);
1496 assert_eq!(items[0].1, 1);
1497 assert_eq!(items[1].0, data_items[0].0);
1498 assert_eq!(items[1].1, data_items[0].1);
1499 assert_eq!(items[2].0, data_items[1].0);
1500 assert_eq!(items[2].1, data_items[1].1);
1501
1502 journal.append(2, 5).await.expect("Failed to append data");
1504 journal.sync(2).await.expect("Failed to sync blob");
1505
1506 let item = journal.get(2, 2).await.expect("Failed to get item");
1508 assert_eq!(item, 5);
1509
1510 journal.close().await.expect("Failed to close journal");
1512
1513 let (_, blob_size) = context
1515 .open(&cfg.partition, &2u64.to_be_bytes())
1516 .await
1517 .expect("Failed to open blob");
1518 assert_eq!(blob_size, 44);
1519
1520 let journal = Journal::init(context.clone(), cfg.clone())
1522 .await
1523 .expect("Failed to re-initialize journal");
1524
1525 let mut items = Vec::<(u64, u32)>::new();
1527 {
1528 let stream = journal
1529 .replay(0, 0, NZUsize!(1024))
1530 .await
1531 .expect("unable to setup replay");
1532 pin_mut!(stream);
1533 while let Some(result) = stream.next().await {
1534 match result {
1535 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1536 Err(err) => panic!("Failed to read item: {err}"),
1537 }
1538 }
1539 }
1540
1541 assert_eq!(items.len(), 4);
1543 assert_eq!(items[0].0, 1);
1544 assert_eq!(items[0].1, 1);
1545 assert_eq!(items[1].0, data_items[0].0);
1546 assert_eq!(items[1].1, data_items[0].1);
1547 assert_eq!(items[2].0, data_items[1].0);
1548 assert_eq!(items[2].1, data_items[1].1);
1549 assert_eq!(items[3].0, 2);
1550 assert_eq!(items[3].1, 5);
1551 });
1552 }
1553
1554 #[test_traced]
1555 fn test_journal_handling_aligned_truncated_data() {
1556 let executor = deterministic::Runner::default();
1558
1559 executor.start(|context| async move {
1561 let cfg = Config {
1563 partition: "test_partition".into(),
1564 compression: None,
1565 codec_config: (),
1566 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1567 write_buffer: NZUsize!(1024),
1568 };
1569
1570 let mut journal = Journal::init(context.clone(), cfg.clone())
1572 .await
1573 .expect("Failed to initialize journal");
1574
1575 journal.append(1, 1).await.expect("Failed to append data");
1577
1578 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1580 for (index, data) in &data_items {
1581 journal
1582 .append(*index, *data)
1583 .await
1584 .expect("Failed to append data");
1585 journal.sync(*index).await.expect("Failed to sync blob");
1586 }
1587
1588 journal.close().await.expect("Failed to close journal");
1590
1591 let (blob, blob_size) = context
1593 .open(&cfg.partition, &2u64.to_be_bytes())
1594 .await
1595 .expect("Failed to open blob");
1596 blob.resize(blob_size - 4)
1597 .await
1598 .expect("Failed to corrupt blob");
1599 blob.sync().await.expect("Failed to sync blob");
1600
1601 let mut journal = Journal::init(context.clone(), cfg.clone())
1603 .await
1604 .expect("Failed to re-initialize journal");
1605
1606 let mut items = Vec::<(u64, u64)>::new();
1608 {
1609 let stream = journal
1610 .replay(0, 0, NZUsize!(1024))
1611 .await
1612 .expect("unable to setup replay");
1613 pin_mut!(stream);
1614 while let Some(result) = stream.next().await {
1615 match result {
1616 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1617 Err(err) => panic!("Failed to read item: {err}"),
1618 }
1619 }
1620 }
1621
1622 assert_eq!(items.len(), 3);
1624 assert_eq!(items[0].0, 1);
1625 assert_eq!(items[0].1, 1);
1626 assert_eq!(items[1].0, data_items[0].0);
1627 assert_eq!(items[1].1, data_items[0].1);
1628 assert_eq!(items[2].0, data_items[1].0);
1629 assert_eq!(items[2].1, data_items[1].1);
1630
1631 journal.append(2, 5).await.expect("Failed to append data");
1633 journal.sync(2).await.expect("Failed to sync blob");
1634
1635 let item = journal.get(2, 2).await.expect("Failed to get item");
1637 assert_eq!(item, 5);
1638
1639 journal.close().await.expect("Failed to close journal");
1641
1642 let (_, blob_size) = context
1644 .open(&cfg.partition, &2u64.to_be_bytes())
1645 .await
1646 .expect("Failed to open blob");
1647 assert_eq!(blob_size, 48);
1648
1649 let journal = Journal::init(context, cfg)
1651 .await
1652 .expect("Failed to re-initialize journal");
1653
1654 let mut items = Vec::<(u64, u64)>::new();
1656 {
1657 let stream = journal
1658 .replay(0, 0, NZUsize!(1024))
1659 .await
1660 .expect("unable to setup replay");
1661 pin_mut!(stream);
1662 while let Some(result) = stream.next().await {
1663 match result {
1664 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1665 Err(err) => panic!("Failed to read item: {err}"),
1666 }
1667 }
1668 }
1669 journal.close().await.expect("Failed to close journal");
1670
1671 assert_eq!(items.len(), 4);
1673 assert_eq!(items[0].0, 1);
1674 assert_eq!(items[0].1, 1);
1675 assert_eq!(items[1].0, data_items[0].0);
1676 assert_eq!(items[1].1, data_items[0].1);
1677 assert_eq!(items[2].0, data_items[1].0);
1678 assert_eq!(items[2].1, data_items[1].1);
1679 assert_eq!(items[3].0, 2);
1680 assert_eq!(items[3].1, 5);
1681 });
1682 }
1683
1684 #[test_traced]
1685 fn test_journal_handling_extra_data() {
1686 let executor = deterministic::Runner::default();
1688
1689 executor.start(|context| async move {
1691 let cfg = Config {
1693 partition: "test_partition".into(),
1694 compression: None,
1695 codec_config: (),
1696 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1697 write_buffer: NZUsize!(1024),
1698 };
1699
1700 let mut journal = Journal::init(context.clone(), cfg.clone())
1702 .await
1703 .expect("Failed to initialize journal");
1704
1705 journal.append(1, 1).await.expect("Failed to append data");
1707
1708 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1710 for (index, data) in &data_items {
1711 journal
1712 .append(*index, *data)
1713 .await
1714 .expect("Failed to append data");
1715 journal.sync(*index).await.expect("Failed to sync blob");
1716 }
1717
1718 journal.close().await.expect("Failed to close journal");
1720
1721 let (blob, blob_size) = context
1723 .open(&cfg.partition, &2u64.to_be_bytes())
1724 .await
1725 .expect("Failed to open blob");
1726 blob.write_at(vec![0u8; 16], blob_size)
1727 .await
1728 .expect("Failed to add extra data");
1729 blob.sync().await.expect("Failed to sync blob");
1730
1731 let journal = Journal::init(context, cfg)
1733 .await
1734 .expect("Failed to re-initialize journal");
1735
1736 let mut items = Vec::<(u64, i32)>::new();
1738 let stream = journal
1739 .replay(0, 0, NZUsize!(1024))
1740 .await
1741 .expect("unable to setup replay");
1742 pin_mut!(stream);
1743 while let Some(result) = stream.next().await {
1744 match result {
1745 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1746 Err(err) => panic!("Failed to read item: {err}"),
1747 }
1748 }
1749 });
1750 }
1751
1752 #[derive(Clone)]
1754 struct MockBlob {}
1755
1756 impl Blob for MockBlob {
1757 async fn read_at(
1758 &self,
1759 buf: impl Into<StableBuf> + Send,
1760 _offset: u64,
1761 ) -> Result<StableBuf, RError> {
1762 Ok(buf.into())
1763 }
1764
1765 async fn write_at(
1766 &self,
1767 _buf: impl Into<StableBuf> + Send,
1768 _offset: u64,
1769 ) -> Result<(), RError> {
1770 Ok(())
1771 }
1772
1773 async fn resize(&self, _len: u64) -> Result<(), RError> {
1774 Ok(())
1775 }
1776
1777 async fn sync(&self) -> Result<(), RError> {
1778 Ok(())
1779 }
1780 }
1781
1782 #[derive(Clone)]
1784 struct MockStorage {
1785 len: u64,
1786 }
1787
1788 impl Storage for MockStorage {
1789 type Blob = MockBlob;
1790
1791 async fn open(&self, _partition: &str, _name: &[u8]) -> Result<(MockBlob, u64), RError> {
1792 Ok((MockBlob {}, self.len))
1793 }
1794
1795 async fn remove(&self, _partition: &str, _name: Option<&[u8]>) -> Result<(), RError> {
1796 Ok(())
1797 }
1798
1799 async fn scan(&self, _partition: &str) -> Result<Vec<Vec<u8>>, RError> {
1800 Ok(vec![])
1801 }
1802 }
1803
1804 impl Metrics for MockStorage {
1805 fn with_label(&self, _: &str) -> Self {
1806 self.clone()
1807 }
1808
1809 fn label(&self) -> String {
1810 String::new()
1811 }
1812
1813 fn register<N: Into<String>, H: Into<String>>(&self, _: N, _: H, _: impl Metric) {}
1814
1815 fn encode(&self) -> String {
1816 String::new()
1817 }
1818 }
1819
1820 const INDEX_ALIGNMENT: u64 = 16;
1823
1824 #[test_traced]
1825 fn test_journal_large_offset() {
1826 let executor = deterministic::Runner::default();
1828 executor.start(|_| async move {
1829 let cfg = Config {
1831 partition: "partition".to_string(),
1832 compression: None,
1833 codec_config: (),
1834 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1835 write_buffer: NZUsize!(1024),
1836 };
1837 let context = MockStorage {
1838 len: u32::MAX as u64 * INDEX_ALIGNMENT, };
1840 let mut journal = Journal::init(context, cfg).await.unwrap();
1841
1842 let data = 1;
1844 let (result, _) = journal
1845 .append(1, data)
1846 .await
1847 .expect("Failed to append data");
1848 assert_eq!(result, u32::MAX);
1849 });
1850 }
1851
1852 #[test_traced]
1853 fn test_journal_offset_overflow() {
1854 let executor = deterministic::Runner::default();
1856 executor.start(|_| async move {
1857 let cfg = Config {
1859 partition: "partition".to_string(),
1860 compression: None,
1861 codec_config: (),
1862 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1863 write_buffer: NZUsize!(1024),
1864 };
1865 let context = MockStorage {
1866 len: u32::MAX as u64 * INDEX_ALIGNMENT + 1,
1867 };
1868 let mut journal = Journal::init(context, cfg).await.unwrap();
1869
1870 let data = 1;
1872 let result = journal.append(1, data).await;
1873 assert!(matches!(result, Err(Error::OffsetOverflow)));
1874 });
1875 }
1876
1877 #[test_traced]
1878 fn test_journal_rewind() {
1879 let executor = deterministic::Runner::default();
1881 executor.start(|context| async move {
1882 let cfg = Config {
1884 partition: "test_partition".to_string(),
1885 compression: None,
1886 codec_config: (),
1887 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1888 write_buffer: NZUsize!(1024),
1889 };
1890 let mut journal = Journal::init(context, cfg).await.unwrap();
1891
1892 let size = journal.size(1).await.unwrap();
1894 assert_eq!(size, 0);
1895
1896 journal.append(1, 42i32).await.unwrap();
1898
1899 let size = journal.size(1).await.unwrap();
1901 assert!(size > 0);
1902
1903 journal.append(1, 43i32).await.unwrap();
1905 let new_size = journal.size(1).await.unwrap();
1906 assert!(new_size > size);
1907
1908 let size = journal.size(2).await.unwrap();
1910 assert_eq!(size, 0);
1911
1912 journal.append(2, 44i32).await.unwrap();
1914
1915 let size = journal.size(2).await.unwrap();
1917 assert!(size > 0);
1918
1919 journal.rewind(1, 0).await.unwrap();
1921
1922 let size = journal.size(1).await.unwrap();
1924 assert_eq!(size, 0);
1925
1926 let size = journal.size(2).await.unwrap();
1928 assert_eq!(size, 0);
1929 });
1930 }
1931
1932 #[test_traced]
1933 fn test_journal_rewind_section() {
1934 let executor = deterministic::Runner::default();
1936 executor.start(|context| async move {
1937 let cfg = Config {
1939 partition: "test_partition".to_string(),
1940 compression: None,
1941 codec_config: (),
1942 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1943 write_buffer: NZUsize!(1024),
1944 };
1945 let mut journal = Journal::init(context, cfg).await.unwrap();
1946
1947 let size = journal.size(1).await.unwrap();
1949 assert_eq!(size, 0);
1950
1951 journal.append(1, 42i32).await.unwrap();
1953
1954 let size = journal.size(1).await.unwrap();
1956 assert!(size > 0);
1957
1958 journal.append(1, 43i32).await.unwrap();
1960 let new_size = journal.size(1).await.unwrap();
1961 assert!(new_size > size);
1962
1963 let size = journal.size(2).await.unwrap();
1965 assert_eq!(size, 0);
1966
1967 journal.append(2, 44i32).await.unwrap();
1969
1970 let size = journal.size(2).await.unwrap();
1972 assert!(size > 0);
1973
1974 journal.rewind_section(1, 0).await.unwrap();
1976
1977 let size = journal.size(1).await.unwrap();
1979 assert_eq!(size, 0);
1980
1981 let size = journal.size(2).await.unwrap();
1983 assert!(size > 0);
1984 });
1985 }
1986
1987 #[test_traced]
1989 fn test_journal_conformance() {
1990 let executor = deterministic::Runner::default();
1992
1993 executor.start(|context| async move {
1995 let cfg = Config {
1997 partition: "test_partition".into(),
1998 compression: None,
1999 codec_config: (),
2000 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2001 write_buffer: NZUsize!(1024),
2002 };
2003
2004 let mut journal = Journal::init(context.clone(), cfg.clone())
2006 .await
2007 .expect("Failed to initialize journal");
2008
2009 for i in 0..100 {
2011 journal.append(1, i).await.expect("Failed to append data");
2012 }
2013 journal.sync(1).await.expect("Failed to sync blob");
2014
2015 journal.close().await.expect("Failed to close journal");
2017
2018 let (blob, size) = context
2020 .open(&cfg.partition, &1u64.to_be_bytes())
2021 .await
2022 .expect("Failed to open blob");
2023 assert!(size > 0);
2024 let buf = blob
2025 .read_at(vec![0u8; size as usize], 0)
2026 .await
2027 .expect("Failed to read blob");
2028 let digest = Sha256::hash(buf.as_ref());
2029 assert_eq!(
2030 hex(&digest),
2031 "ca3845fa7fabd4d2855ab72ed21226d1d6eb30cb895ea9ec5e5a14201f3f25d8",
2032 );
2033 });
2034 }
2035}