1use super::Error;
104use bytes::BufMut;
105use commonware_codec::Codec;
106use commonware_runtime::{
107 buffer::{Append, PoolRef, Read},
108 Blob, Error as RError, Metrics, Storage,
109};
110use commonware_utils::hex;
111use futures::stream::{self, Stream, StreamExt};
112use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
113use std::{
114 collections::{btree_map::Entry, BTreeMap},
115 io::Cursor,
116 marker::PhantomData,
117 num::NonZeroUsize,
118};
119use tracing::{debug, trace, warn};
120use zstd::{bulk::compress, decode_all};
121
122#[derive(Clone)]
124pub struct Config<C> {
125 pub partition: String,
128
129 pub compression: Option<u8>,
131
132 pub codec_config: C,
134
135 pub buffer_pool: PoolRef,
137
138 pub write_buffer: NonZeroUsize,
140}
141
142pub(crate) const ITEM_ALIGNMENT: u64 = 16;
143
144#[inline]
147fn compute_next_offset(mut offset: u64) -> Result<u32, Error> {
148 let overage = offset % ITEM_ALIGNMENT;
149 if overage != 0 {
150 offset += ITEM_ALIGNMENT - overage;
151 }
152 let offset = offset / ITEM_ALIGNMENT;
153 let aligned_offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
154 Ok(aligned_offset)
155}
156
157pub struct Journal<E: Storage + Metrics, V: Codec> {
159 pub(crate) context: E,
160 pub(crate) cfg: Config<V::Cfg>,
161
162 pub(crate) blobs: BTreeMap<u64, Append<E::Blob>>,
163
164 pub(crate) oldest_retained_section: u64,
168
169 pub(crate) tracked: Gauge,
170 pub(crate) synced: Counter,
171 pub(crate) pruned: Counter,
172
173 pub(crate) _phantom: PhantomData<V>,
174}
175
176impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
177 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
183 let mut blobs = BTreeMap::new();
185 let stored_blobs = match context.scan(&cfg.partition).await {
186 Ok(blobs) => blobs,
187 Err(RError::PartitionMissing(_)) => Vec::new(),
188 Err(err) => return Err(Error::Runtime(err)),
189 };
190 for name in stored_blobs {
191 let (blob, size) = context.open(&cfg.partition, &name).await?;
192 let hex_name = hex(&name);
193 let section = match name.try_into() {
194 Ok(section) => u64::from_be_bytes(section),
195 Err(_) => return Err(Error::InvalidBlobName(hex_name)),
196 };
197 debug!(section, blob = hex_name, size, "loaded section");
198 let blob = Append::new(blob, size, cfg.write_buffer, cfg.buffer_pool.clone()).await?;
199 blobs.insert(section, blob);
200 }
201
202 let tracked = Gauge::default();
204 let synced = Counter::default();
205 let pruned = Counter::default();
206 context.register("tracked", "Number of blobs", tracked.clone());
207 context.register("synced", "Number of syncs", synced.clone());
208 context.register("pruned", "Number of blobs pruned", pruned.clone());
209 tracked.set(blobs.len() as i64);
210
211 Ok(Self {
213 context,
214 cfg,
215 blobs,
216 oldest_retained_section: 0,
217 tracked,
218 synced,
219 pruned,
220
221 _phantom: PhantomData,
222 })
223 }
224
225 fn prune_guard(&self, section: u64) -> Result<(), Error> {
227 if section < self.oldest_retained_section {
228 Err(Error::AlreadyPrunedToSection(self.oldest_retained_section))
229 } else {
230 Ok(())
231 }
232 }
233
234 pub(crate) async fn read(
236 compressed: bool,
237 cfg: &V::Cfg,
238 blob: &Append<E::Blob>,
239 offset: u32,
240 ) -> Result<(u32, u32, V), Error> {
241 let mut hasher = crc32fast::Hasher::new();
243 let offset = offset as u64 * ITEM_ALIGNMENT;
244 let size = blob.read_at(vec![0; 4], offset).await?;
245 hasher.update(size.as_ref());
246 let size = u32::from_be_bytes(size.as_ref().try_into().unwrap()) as usize;
247 let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
248
249 let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
251 let buf = blob.read_at(vec![0u8; buf_size], offset).await?;
252 let buf = buf.as_ref();
253 let offset = offset
254 .checked_add(buf_size as u64)
255 .ok_or(Error::OffsetOverflow)?;
256
257 let item = &buf[..size];
259 hasher.update(item);
260
261 let checksum = hasher.finalize();
263 let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
264 if checksum != stored_checksum {
265 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
266 }
267
268 let aligned_offset = compute_next_offset(offset)?;
270
271 let item = if compressed {
273 let decompressed =
274 decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
275 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
276 } else {
277 V::decode_cfg(item, cfg).map_err(Error::Codec)?
278 };
279
280 Ok((aligned_offset, size as u32, item))
282 }
283
284 async fn read_buffered(
286 reader: &mut Read<Append<E::Blob>>,
287 offset: u32,
288 cfg: &V::Cfg,
289 compressed: bool,
290 ) -> Result<(u32, u64, u32, V), Error> {
291 let file_offset = offset as u64 * ITEM_ALIGNMENT;
293
294 if reader.position() != file_offset {
296 reader.seek_to(file_offset).map_err(Error::Runtime)?;
297 }
298
299 let mut hasher = crc32fast::Hasher::new();
301 let mut size_buf = [0u8; 4];
302 reader
303 .read_exact(&mut size_buf, 4)
304 .await
305 .map_err(Error::Runtime)?;
306 hasher.update(&size_buf);
307
308 let size = u32::from_be_bytes(size_buf) as usize;
310 let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
311 let mut buf = vec![0u8; buf_size];
312 reader
313 .read_exact(&mut buf, buf_size)
314 .await
315 .map_err(Error::Runtime)?;
316
317 let item = &buf[..size];
319 hasher.update(item);
320
321 let checksum = hasher.finalize();
323 let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
324 if checksum != stored_checksum {
325 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
326 }
327
328 let item = if compressed {
330 let decompressed =
331 decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
332 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
333 } else {
334 V::decode_cfg(item, cfg).map_err(Error::Codec)?
335 };
336
337 let current_pos = reader.position();
339 let aligned_offset = compute_next_offset(current_pos)?;
340 Ok((aligned_offset, current_pos, size as u32, item))
341 }
342
343 async fn read_exact(
345 compressed: bool,
346 cfg: &V::Cfg,
347 blob: &Append<E::Blob>,
348 offset: u32,
349 len: u32,
350 ) -> Result<V, Error> {
351 let offset = offset as u64 * ITEM_ALIGNMENT;
353 let entry_size = 4 + len as usize + 4;
354 let buf = blob.read_at(vec![0u8; entry_size], offset).await?;
355
356 let mut hasher = crc32fast::Hasher::new();
358 let disk_size = u32::from_be_bytes(buf.as_ref()[..4].try_into().unwrap());
359 hasher.update(&buf.as_ref()[..4]);
360 if disk_size != len {
361 return Err(Error::UnexpectedSize(disk_size, len));
362 }
363
364 let item = &buf.as_ref()[4..4 + len as usize];
366 hasher.update(item);
367 let checksum = hasher.finalize();
368 let stored_checksum =
369 u32::from_be_bytes(buf.as_ref()[4 + len as usize..].try_into().unwrap());
370 if checksum != stored_checksum {
371 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
372 }
373
374 let item = if compressed {
376 decode_all(Cursor::new(item)).map_err(|_| Error::DecompressionFailed)?
377 } else {
378 item.to_vec()
379 };
380
381 let item = V::decode_cfg(item.as_ref(), cfg).map_err(Error::Codec)?;
383 Ok(item)
384 }
385
386 pub async fn replay(
399 &self,
400 start_section: u64,
401 mut offset: u32,
402 buffer: NonZeroUsize,
403 ) -> Result<impl Stream<Item = Result<(u64, u32, u32, V), Error>> + '_, Error> {
404 let codec_config = self.cfg.codec_config.clone();
406 let compressed = self.cfg.compression.is_some();
407 let mut blobs = Vec::with_capacity(self.blobs.len());
408 for (section, blob) in self.blobs.range(start_section..) {
409 let blob_size = blob.size().await;
410 let max_offset = compute_next_offset(blob_size)?;
411 blobs.push((
412 *section,
413 blob.clone(),
414 max_offset,
415 blob_size,
416 codec_config.clone(),
417 compressed,
418 ));
419 }
420
421 Ok(stream::iter(blobs).flat_map(
424 move |(section, blob, max_offset, blob_size, codec_config, compressed)| {
425 let mut reader = Read::new(blob, blob_size, buffer);
427 if section == start_section && offset != 0 {
428 if let Err(err) = reader.seek_to(offset as u64 * ITEM_ALIGNMENT) {
429 warn!(section, offset, ?err, "failed to seek to offset");
430 return stream::once(async move { Err(err.into()) }).left_stream();
432 }
433 } else {
434 offset = 0;
435 }
436
437 stream::unfold(
439 (section, reader, offset, 0u64, codec_config, compressed),
440 move |(
441 section,
442 mut reader,
443 offset,
444 valid_size,
445 codec_config,
446 compressed,
447 )| async move {
448 if offset >= max_offset {
450 return None;
451 }
452
453 match Self::read_buffered(
455 &mut reader,
456 offset,
457 &codec_config,
458 compressed,
459 )
460 .await
461 {
462 Ok((next_offset, next_valid_size, size, item)) => {
463 trace!(blob = section, cursor = offset, "replayed item");
464 Some((
465 Ok((section, offset, size, item)),
466 (
467 section,
468 reader,
469 next_offset,
470 next_valid_size,
471 codec_config,
472 compressed,
473 ),
474 ))
475 }
476 Err(Error::ChecksumMismatch(expected, found)) => {
477 warn!(
481 blob = section,
482 bad_offset = offset,
483 new_size = valid_size,
484 expected,
485 found,
486 "corruption detected: truncating"
487 );
488 reader.resize(valid_size).await.ok()?;
489 None
490 }
491 Err(Error::Runtime(RError::BlobInsufficientLength)) => {
492 warn!(
496 blob = section,
497 bad_offset = offset,
498 new_size = valid_size,
499 "trailing bytes detected: truncating"
500 );
501 reader.resize(valid_size).await.ok()?;
502 None
503 }
504 Err(err) => {
505 warn!(
508 blob = section,
509 cursor = offset,
510 ?err,
511 "unexpected error"
512 );
513 Some((
514 Err(err),
515 (
516 section,
517 reader,
518 offset,
519 valid_size,
520 codec_config,
521 compressed,
522 ),
523 ))
524 }
525 }
526 },
527 ).right_stream()
528 },
529 ))
530 }
531
532 pub async fn append(&mut self, section: u64, item: V) -> Result<(u32, u32), Error> {
544 self.prune_guard(section)?;
546
547 let encoded = item.encode();
549 let encoded = if let Some(compression) = self.cfg.compression {
550 compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?
551 } else {
552 encoded.into()
553 };
554
555 let item_len = encoded.len();
557 let entry_len = 4 + item_len + 4;
558 let item_len = match item_len.try_into() {
559 Ok(len) => len,
560 Err(_) => return Err(Error::ItemTooLarge(item_len)),
561 };
562
563 let blob = match self.blobs.entry(section) {
565 Entry::Occupied(entry) => entry.into_mut(),
566 Entry::Vacant(entry) => {
567 let name = section.to_be_bytes();
568 let (blob, size) = self.context.open(&self.cfg.partition, &name).await?;
569 let blob = Append::new(
570 blob,
571 size,
572 self.cfg.write_buffer,
573 self.cfg.buffer_pool.clone(),
574 )
575 .await?;
576 self.tracked.inc();
577 entry.insert(blob)
578 }
579 };
580
581 let cursor = blob.size().await;
583 let offset = compute_next_offset(cursor)?;
584 let aligned_cursor = offset as u64 * ITEM_ALIGNMENT;
585 let padding = (aligned_cursor - cursor) as usize;
586
587 let mut buf = Vec::with_capacity(padding + entry_len);
589
590 if padding > 0 {
592 buf.resize(padding, 0);
593 }
594
595 let entry_start = buf.len();
597 buf.put_u32(item_len);
598 buf.put_slice(&encoded);
599
600 let checksum = crc32fast::hash(&buf[entry_start..]);
602 buf.put_u32(checksum);
603 assert_eq!(buf[entry_start..].len(), entry_len);
604
605 blob.append(buf).await?;
607 trace!(blob = section, offset, "appended item");
608 Ok((offset, item_len))
609 }
610
611 pub async fn get(&self, section: u64, offset: u32) -> Result<Option<V>, Error> {
613 self.prune_guard(section)?;
614 let blob = match self.blobs.get(§ion) {
615 Some(blob) => blob,
616 None => return Ok(None),
617 };
618
619 let (_, _, item) = Self::read(
621 self.cfg.compression.is_some(),
622 &self.cfg.codec_config,
623 blob,
624 offset,
625 )
626 .await?;
627 Ok(Some(item))
628 }
629
630 pub async fn get_exact(
632 &self,
633 section: u64,
634 offset: u32,
635 size: u32,
636 ) -> Result<Option<V>, Error> {
637 self.prune_guard(section)?;
638 let blob = match self.blobs.get(§ion) {
639 Some(blob) => blob,
640 None => return Ok(None),
641 };
642
643 let item = Self::read_exact(
645 self.cfg.compression.is_some(),
646 &self.cfg.codec_config,
647 blob,
648 offset,
649 size,
650 )
651 .await?;
652 Ok(Some(item))
653 }
654
655 pub async fn size(&self, section: u64) -> Result<u64, Error> {
659 self.prune_guard(section)?;
660 match self.blobs.get(§ion) {
661 Some(blob) => Ok(blob.size().await),
662 None => Ok(0),
663 }
664 }
665
666 pub async fn rewind_to_offset(&mut self, section: u64, offset: u32) -> Result<(), Error> {
674 self.rewind(section, offset as u64 * ITEM_ALIGNMENT).await
675 }
676
677 pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
687 self.prune_guard(section)?;
688
689 let trailing: Vec<u64> = self
691 .blobs
692 .range((
693 std::ops::Bound::Excluded(section),
694 std::ops::Bound::Unbounded,
695 ))
696 .map(|(§ion, _)| section)
697 .collect();
698 for index in trailing.iter().rev() {
699 let blob = self.blobs.remove(index).unwrap();
701
702 drop(blob);
704 self.context
705 .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
706 .await?;
707 debug!(section = index, "removed section");
708 self.tracked.dec();
709 }
710
711 let blob = match self.blobs.get_mut(§ion) {
713 Some(blob) => blob,
714 None => return Ok(()),
715 };
716 let current = blob.size().await;
717 if size >= current {
718 return Ok(()); }
720 blob.resize(size).await?;
721 debug!(
722 section,
723 from = current,
724 to = size,
725 ?trailing,
726 "rewound journal"
727 );
728 Ok(())
729 }
730
731 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
739 self.prune_guard(section)?;
740
741 let blob = match self.blobs.get_mut(§ion) {
743 Some(blob) => blob,
744 None => return Ok(()),
745 };
746
747 let current = blob.size().await;
749 if size >= current {
750 return Ok(()); }
752 blob.resize(size).await?;
753 debug!(section, from = current, to = size, "rewound section");
754 Ok(())
755 }
756
757 pub async fn sync(&self, section: u64) -> Result<(), Error> {
761 self.prune_guard(section)?;
762 let blob = match self.blobs.get(§ion) {
763 Some(blob) => blob,
764 None => return Ok(()),
765 };
766 self.synced.inc();
767 blob.sync().await.map_err(Error::Runtime)
768 }
769
770 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
772 let mut pruned = false;
774 while let Some((§ion, _)) = self.blobs.first_key_value() {
775 if section >= min {
777 break;
778 }
779
780 let blob = self.blobs.remove(§ion).unwrap();
782 let size = blob.size().await;
783 drop(blob);
784
785 self.context
787 .remove(&self.cfg.partition, Some(§ion.to_be_bytes()))
788 .await?;
789 pruned = true;
790
791 debug!(blob = section, size, "pruned blob");
792 self.tracked.dec();
793 self.pruned.inc();
794 }
795
796 Ok(pruned)
797 }
798
799 pub async fn close(self) -> Result<(), Error> {
801 for (section, blob) in self.blobs.into_iter() {
802 let size = blob.size().await;
803 blob.sync().await?;
804 debug!(blob = section, size, "synced blob");
805 }
806 Ok(())
807 }
808
809 pub fn oldest_section(&self) -> Option<u64> {
811 self.blobs.first_key_value().map(|(section, _)| *section)
812 }
813
814 pub async fn destroy(self) -> Result<(), Error> {
816 for (i, blob) in self.blobs.into_iter() {
817 let size = blob.size().await;
818 drop(blob);
819 debug!(blob = i, size, "destroyed blob");
820 self.context
821 .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
822 .await?;
823 }
824 match self.context.remove(&self.cfg.partition, None).await {
825 Ok(()) => {}
826 Err(RError::PartitionMissing(_)) => {
827 }
829 Err(err) => return Err(Error::Runtime(err)),
830 }
831 Ok(())
832 }
833}
834
835#[cfg(test)]
836mod tests {
837 use super::*;
838 use bytes::BufMut;
839 use commonware_cryptography::{Hasher, Sha256};
840 use commonware_macros::test_traced;
841 use commonware_runtime::{deterministic, Blob, Error as RError, Runner, Storage};
842 use commonware_utils::{NZUsize, StableBuf};
843 use futures::{pin_mut, StreamExt};
844 use prometheus_client::registry::Metric;
845
846 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
847 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
848
849 #[test_traced]
850 fn test_journal_append_and_read() {
851 let executor = deterministic::Runner::default();
853
854 executor.start(|context| async move {
856 let cfg = Config {
858 partition: "test_partition".into(),
859 compression: None,
860 codec_config: (),
861 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
862 write_buffer: NZUsize!(1024),
863 };
864 let index = 1u64;
865 let data = 10;
866 let mut journal = Journal::init(context.clone(), cfg.clone())
867 .await
868 .expect("Failed to initialize journal");
869
870 journal
872 .append(index, data)
873 .await
874 .expect("Failed to append data");
875
876 let buffer = context.encode();
878 assert!(buffer.contains("tracked 1"));
879
880 journal.close().await.expect("Failed to close journal");
882
883 let cfg = Config {
885 partition: "test_partition".into(),
886 compression: None,
887 codec_config: (),
888 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
889 write_buffer: NZUsize!(1024),
890 };
891 let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
892 .await
893 .expect("Failed to re-initialize journal");
894
895 let mut items = Vec::new();
897 let stream = journal
898 .replay(0, 0, NZUsize!(1024))
899 .await
900 .expect("unable to setup replay");
901 pin_mut!(stream);
902 while let Some(result) = stream.next().await {
903 match result {
904 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
905 Err(err) => panic!("Failed to read item: {err}"),
906 }
907 }
908
909 assert_eq!(items.len(), 1);
911 assert_eq!(items[0].0, index);
912 assert_eq!(items[0].1, data);
913
914 let buffer = context.encode();
916 assert!(buffer.contains("tracked 1"));
917 });
918 }
919
920 #[test_traced]
921 fn test_journal_multiple_appends_and_reads() {
922 let executor = deterministic::Runner::default();
924
925 executor.start(|context| async move {
927 let cfg = Config {
929 partition: "test_partition".into(),
930 compression: None,
931 codec_config: (),
932 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
933 write_buffer: NZUsize!(1024),
934 };
935
936 let mut journal = Journal::init(context.clone(), cfg.clone())
938 .await
939 .expect("Failed to initialize journal");
940
941 let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
943 for (index, data) in &data_items {
944 journal
945 .append(*index, *data)
946 .await
947 .expect("Failed to append data");
948 journal.sync(*index).await.expect("Failed to sync blob");
949 }
950
951 let buffer = context.encode();
953 assert!(buffer.contains("tracked 3"));
954 assert!(buffer.contains("synced_total 4"));
955
956 journal.close().await.expect("Failed to close journal");
958
959 let journal = Journal::init(context, cfg)
961 .await
962 .expect("Failed to re-initialize journal");
963
964 let mut items = Vec::<(u64, u32)>::new();
966 {
967 let stream = journal
968 .replay(0, 0, NZUsize!(1024))
969 .await
970 .expect("unable to setup replay");
971 pin_mut!(stream);
972 while let Some(result) = stream.next().await {
973 match result {
974 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
975 Err(err) => panic!("Failed to read item: {err}"),
976 }
977 }
978 }
979
980 assert_eq!(items.len(), data_items.len());
982 for ((expected_index, expected_data), (actual_index, actual_data)) in
983 data_items.iter().zip(items.iter())
984 {
985 assert_eq!(actual_index, expected_index);
986 assert_eq!(actual_data, expected_data);
987 }
988
989 journal.destroy().await.expect("Failed to destroy journal");
991 });
992 }
993
994 #[test_traced]
995 fn test_journal_prune_blobs() {
996 let executor = deterministic::Runner::default();
998
999 executor.start(|context| async move {
1001 let cfg = Config {
1003 partition: "test_partition".into(),
1004 compression: None,
1005 codec_config: (),
1006 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1007 write_buffer: NZUsize!(1024),
1008 };
1009
1010 let mut journal = Journal::init(context.clone(), cfg.clone())
1012 .await
1013 .expect("Failed to initialize journal");
1014
1015 for index in 1u64..=5u64 {
1017 journal
1018 .append(index, index)
1019 .await
1020 .expect("Failed to append data");
1021 journal.sync(index).await.expect("Failed to sync blob");
1022 }
1023
1024 let data = 99;
1026 journal
1027 .append(2u64, data)
1028 .await
1029 .expect("Failed to append data");
1030 journal.sync(2u64).await.expect("Failed to sync blob");
1031
1032 journal.prune(3).await.expect("Failed to prune blobs");
1034
1035 let buffer = context.encode();
1037 assert!(buffer.contains("pruned_total 2"));
1038
1039 journal.prune(2).await.expect("Failed to no-op prune");
1041 let buffer = context.encode();
1042 assert!(buffer.contains("pruned_total 2"));
1043
1044 journal.close().await.expect("Failed to close journal");
1046
1047 let mut journal = Journal::init(context.clone(), cfg.clone())
1049 .await
1050 .expect("Failed to re-initialize journal");
1051
1052 let mut items = Vec::<(u64, u64)>::new();
1054 {
1055 let stream = journal
1056 .replay(0, 0, NZUsize!(1024))
1057 .await
1058 .expect("unable to setup replay");
1059 pin_mut!(stream);
1060 while let Some(result) = stream.next().await {
1061 match result {
1062 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1063 Err(err) => panic!("Failed to read item: {err}"),
1064 }
1065 }
1066 }
1067
1068 assert_eq!(items.len(), 3);
1070 let expected_indices = [3u64, 4u64, 5u64];
1071 for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
1072 assert_eq!(item.0, *expected_index);
1073 }
1074
1075 journal.prune(6).await.expect("Failed to prune blobs");
1077
1078 journal.close().await.expect("Failed to close journal");
1080
1081 assert!(context
1086 .scan(&cfg.partition)
1087 .await
1088 .expect("Failed to list blobs")
1089 .is_empty());
1090 });
1091 }
1092
1093 #[test_traced]
1094 fn test_journal_with_invalid_blob_name() {
1095 let executor = deterministic::Runner::default();
1097
1098 executor.start(|context| async move {
1100 let cfg = Config {
1102 partition: "test_partition".into(),
1103 compression: None,
1104 codec_config: (),
1105 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1106 write_buffer: NZUsize!(1024),
1107 };
1108
1109 let invalid_blob_name = b"invalid"; let (blob, _) = context
1112 .open(&cfg.partition, invalid_blob_name)
1113 .await
1114 .expect("Failed to create blob with invalid name");
1115 blob.sync().await.expect("Failed to sync blob");
1116
1117 let result = Journal::<_, u64>::init(context, cfg).await;
1119
1120 assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1122 });
1123 }
1124
1125 #[test_traced]
1126 fn test_journal_read_size_missing() {
1127 let executor = deterministic::Runner::default();
1129
1130 executor.start(|context| async move {
1132 let cfg = Config {
1134 partition: "test_partition".into(),
1135 compression: None,
1136 codec_config: (),
1137 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1138 write_buffer: NZUsize!(1024),
1139 };
1140
1141 let section = 1u64;
1143 let blob_name = section.to_be_bytes();
1144 let (blob, _) = context
1145 .open(&cfg.partition, &blob_name)
1146 .await
1147 .expect("Failed to create blob");
1148
1149 let incomplete_data = vec![0x00, 0x01]; blob.write_at(incomplete_data, 0)
1152 .await
1153 .expect("Failed to write incomplete data");
1154 blob.sync().await.expect("Failed to sync blob");
1155
1156 let journal = Journal::init(context, cfg)
1158 .await
1159 .expect("Failed to initialize journal");
1160
1161 let stream = journal
1163 .replay(0, 0, NZUsize!(1024))
1164 .await
1165 .expect("unable to setup replay");
1166 pin_mut!(stream);
1167 let mut items = Vec::<(u64, u64)>::new();
1168 while let Some(result) = stream.next().await {
1169 match result {
1170 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1171 Err(err) => panic!("Failed to read item: {err}"),
1172 }
1173 }
1174 assert!(items.is_empty());
1175 });
1176 }
1177
1178 #[test_traced]
1179 fn test_journal_read_item_missing() {
1180 let executor = deterministic::Runner::default();
1182
1183 executor.start(|context| async move {
1185 let cfg = Config {
1187 partition: "test_partition".into(),
1188 compression: None,
1189 codec_config: (),
1190 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1191 write_buffer: NZUsize!(1024),
1192 };
1193
1194 let section = 1u64;
1196 let blob_name = section.to_be_bytes();
1197 let (blob, _) = context
1198 .open(&cfg.partition, &blob_name)
1199 .await
1200 .expect("Failed to create blob");
1201
1202 let item_size: u32 = 10; let mut buf = Vec::new();
1205 buf.put_u32(item_size);
1206 let data = [2u8; 5];
1207 BufMut::put_slice(&mut buf, &data);
1208 blob.write_at(buf, 0)
1209 .await
1210 .expect("Failed to write item size");
1211 blob.sync().await.expect("Failed to sync blob");
1212
1213 let journal = Journal::init(context, cfg)
1215 .await
1216 .expect("Failed to initialize journal");
1217
1218 let stream = journal
1220 .replay(0, 0, NZUsize!(1024))
1221 .await
1222 .expect("unable to setup replay");
1223 pin_mut!(stream);
1224 let mut items = Vec::<(u64, u64)>::new();
1225 while let Some(result) = stream.next().await {
1226 match result {
1227 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1228 Err(err) => panic!("Failed to read item: {err}"),
1229 }
1230 }
1231 assert!(items.is_empty());
1232 });
1233 }
1234
1235 #[test_traced]
1236 fn test_journal_read_checksum_missing() {
1237 let executor = deterministic::Runner::default();
1239
1240 executor.start(|context| async move {
1242 let cfg = Config {
1244 partition: "test_partition".into(),
1245 compression: None,
1246 codec_config: (),
1247 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1248 write_buffer: NZUsize!(1024),
1249 };
1250
1251 let section = 1u64;
1253 let blob_name = section.to_be_bytes();
1254 let (blob, _) = context
1255 .open(&cfg.partition, &blob_name)
1256 .await
1257 .expect("Failed to create blob");
1258
1259 let item_data = b"Test data";
1261 let item_size = item_data.len() as u32;
1262
1263 let mut offset = 0;
1265 blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1266 .await
1267 .expect("Failed to write item size");
1268 offset += 4;
1269
1270 blob.write_at(item_data.to_vec(), offset)
1272 .await
1273 .expect("Failed to write item data");
1274 blob.sync().await.expect("Failed to sync blob");
1277
1278 let journal = Journal::init(context, cfg)
1280 .await
1281 .expect("Failed to initialize journal");
1282
1283 let stream = journal
1287 .replay(0, 0, NZUsize!(1024))
1288 .await
1289 .expect("unable to setup replay");
1290 pin_mut!(stream);
1291 let mut items = Vec::<(u64, u64)>::new();
1292 while let Some(result) = stream.next().await {
1293 match result {
1294 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1295 Err(err) => panic!("Failed to read item: {err}"),
1296 }
1297 }
1298 assert!(items.is_empty());
1299 });
1300 }
1301
1302 #[test_traced]
1303 fn test_journal_read_checksum_mismatch() {
1304 let executor = deterministic::Runner::default();
1306
1307 executor.start(|context| async move {
1309 let cfg = Config {
1311 partition: "test_partition".into(),
1312 compression: None,
1313 codec_config: (),
1314 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1315 write_buffer: NZUsize!(1024),
1316 };
1317
1318 let section = 1u64;
1320 let blob_name = section.to_be_bytes();
1321 let (blob, _) = context
1322 .open(&cfg.partition, &blob_name)
1323 .await
1324 .expect("Failed to create blob");
1325
1326 let item_data = b"Test data";
1328 let item_size = item_data.len() as u32;
1329 let incorrect_checksum: u32 = 0xDEADBEEF;
1330
1331 let mut offset = 0;
1333 blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1334 .await
1335 .expect("Failed to write item size");
1336 offset += 4;
1337
1338 blob.write_at(item_data.to_vec(), offset)
1340 .await
1341 .expect("Failed to write item data");
1342 offset += item_data.len() as u64;
1343
1344 blob.write_at(incorrect_checksum.to_be_bytes().to_vec(), offset)
1346 .await
1347 .expect("Failed to write incorrect checksum");
1348
1349 blob.sync().await.expect("Failed to sync blob");
1350
1351 let journal = Journal::init(context.clone(), cfg.clone())
1353 .await
1354 .expect("Failed to initialize journal");
1355
1356 {
1358 let stream = journal
1359 .replay(0, 0, NZUsize!(1024))
1360 .await
1361 .expect("unable to setup replay");
1362 pin_mut!(stream);
1363 let mut items = Vec::<(u64, u64)>::new();
1364 while let Some(result) = stream.next().await {
1365 match result {
1366 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1367 Err(err) => panic!("Failed to read item: {err}"),
1368 }
1369 }
1370 assert!(items.is_empty());
1371 }
1372 journal.close().await.expect("Failed to close journal");
1373
1374 let (_, blob_size) = context
1376 .open(&cfg.partition, §ion.to_be_bytes())
1377 .await
1378 .expect("Failed to open blob");
1379 assert_eq!(blob_size, 0);
1380 });
1381 }
1382
1383 #[test_traced]
1384 fn test_journal_handling_unaligned_truncated_data() {
1385 let executor = deterministic::Runner::default();
1387
1388 executor.start(|context| async move {
1390 let cfg = Config {
1392 partition: "test_partition".into(),
1393 compression: None,
1394 codec_config: (),
1395 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1396 write_buffer: NZUsize!(1024),
1397 };
1398
1399 let mut journal = Journal::init(context.clone(), cfg.clone())
1401 .await
1402 .expect("Failed to initialize journal");
1403
1404 journal.append(1, 1).await.expect("Failed to append data");
1406
1407 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1409 for (index, data) in &data_items {
1410 journal
1411 .append(*index, *data)
1412 .await
1413 .expect("Failed to append data");
1414 journal.sync(*index).await.expect("Failed to sync blob");
1415 }
1416
1417 journal.close().await.expect("Failed to close journal");
1419
1420 let (blob, blob_size) = context
1422 .open(&cfg.partition, &2u64.to_be_bytes())
1423 .await
1424 .expect("Failed to open blob");
1425 blob.resize(blob_size - 4)
1426 .await
1427 .expect("Failed to corrupt blob");
1428 blob.sync().await.expect("Failed to sync blob");
1429
1430 let journal = Journal::init(context.clone(), cfg.clone())
1432 .await
1433 .expect("Failed to re-initialize journal");
1434
1435 let mut items = Vec::<(u64, u32)>::new();
1437 {
1438 let stream = journal
1439 .replay(0, 0, NZUsize!(1024))
1440 .await
1441 .expect("unable to setup replay");
1442 pin_mut!(stream);
1443 while let Some(result) = stream.next().await {
1444 match result {
1445 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1446 Err(err) => panic!("Failed to read item: {err}"),
1447 }
1448 }
1449 }
1450 journal.close().await.expect("Failed to close journal");
1451
1452 assert_eq!(items.len(), 3);
1454 assert_eq!(items[0].0, 1);
1455 assert_eq!(items[0].1, 1);
1456 assert_eq!(items[1].0, data_items[0].0);
1457 assert_eq!(items[1].1, data_items[0].1);
1458 assert_eq!(items[2].0, data_items[1].0);
1459 assert_eq!(items[2].1, data_items[1].1);
1460
1461 let (_, blob_size) = context
1463 .open(&cfg.partition, &2u64.to_be_bytes())
1464 .await
1465 .expect("Failed to open blob");
1466 assert_eq!(blob_size, 28);
1467
1468 let mut journal = Journal::init(context.clone(), cfg.clone())
1470 .await
1471 .expect("Failed to re-initialize journal");
1472
1473 let mut items = Vec::<(u64, u32)>::new();
1475 {
1476 let stream = journal
1477 .replay(0, 0, NZUsize!(1024))
1478 .await
1479 .expect("unable to setup replay");
1480 pin_mut!(stream);
1481 while let Some(result) = stream.next().await {
1482 match result {
1483 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1484 Err(err) => panic!("Failed to read item: {err}"),
1485 }
1486 }
1487 }
1488
1489 assert_eq!(items.len(), 3);
1491 assert_eq!(items[0].0, 1);
1492 assert_eq!(items[0].1, 1);
1493 assert_eq!(items[1].0, data_items[0].0);
1494 assert_eq!(items[1].1, data_items[0].1);
1495 assert_eq!(items[2].0, data_items[1].0);
1496 assert_eq!(items[2].1, data_items[1].1);
1497
1498 journal.append(2, 5).await.expect("Failed to append data");
1500 journal.sync(2).await.expect("Failed to sync blob");
1501
1502 let item = journal
1504 .get(2, 2)
1505 .await
1506 .expect("Failed to get item")
1507 .expect("Failed to get item");
1508 assert_eq!(item, 5);
1509
1510 journal.close().await.expect("Failed to close journal");
1512
1513 let (_, blob_size) = context
1515 .open(&cfg.partition, &2u64.to_be_bytes())
1516 .await
1517 .expect("Failed to open blob");
1518 assert_eq!(blob_size, 44);
1519
1520 let journal = Journal::init(context.clone(), cfg.clone())
1522 .await
1523 .expect("Failed to re-initialize journal");
1524
1525 let mut items = Vec::<(u64, u32)>::new();
1527 {
1528 let stream = journal
1529 .replay(0, 0, NZUsize!(1024))
1530 .await
1531 .expect("unable to setup replay");
1532 pin_mut!(stream);
1533 while let Some(result) = stream.next().await {
1534 match result {
1535 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1536 Err(err) => panic!("Failed to read item: {err}"),
1537 }
1538 }
1539 }
1540
1541 assert_eq!(items.len(), 4);
1543 assert_eq!(items[0].0, 1);
1544 assert_eq!(items[0].1, 1);
1545 assert_eq!(items[1].0, data_items[0].0);
1546 assert_eq!(items[1].1, data_items[0].1);
1547 assert_eq!(items[2].0, data_items[1].0);
1548 assert_eq!(items[2].1, data_items[1].1);
1549 assert_eq!(items[3].0, 2);
1550 assert_eq!(items[3].1, 5);
1551 });
1552 }
1553
1554 #[test_traced]
1555 fn test_journal_handling_aligned_truncated_data() {
1556 let executor = deterministic::Runner::default();
1558
1559 executor.start(|context| async move {
1561 let cfg = Config {
1563 partition: "test_partition".into(),
1564 compression: None,
1565 codec_config: (),
1566 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1567 write_buffer: NZUsize!(1024),
1568 };
1569
1570 let mut journal = Journal::init(context.clone(), cfg.clone())
1572 .await
1573 .expect("Failed to initialize journal");
1574
1575 journal.append(1, 1).await.expect("Failed to append data");
1577
1578 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1580 for (index, data) in &data_items {
1581 journal
1582 .append(*index, *data)
1583 .await
1584 .expect("Failed to append data");
1585 journal.sync(*index).await.expect("Failed to sync blob");
1586 }
1587
1588 journal.close().await.expect("Failed to close journal");
1590
1591 let (blob, blob_size) = context
1593 .open(&cfg.partition, &2u64.to_be_bytes())
1594 .await
1595 .expect("Failed to open blob");
1596 blob.resize(blob_size - 4)
1597 .await
1598 .expect("Failed to corrupt blob");
1599 blob.sync().await.expect("Failed to sync blob");
1600
1601 let mut journal = Journal::init(context.clone(), cfg.clone())
1603 .await
1604 .expect("Failed to re-initialize journal");
1605
1606 let mut items = Vec::<(u64, u64)>::new();
1608 {
1609 let stream = journal
1610 .replay(0, 0, NZUsize!(1024))
1611 .await
1612 .expect("unable to setup replay");
1613 pin_mut!(stream);
1614 while let Some(result) = stream.next().await {
1615 match result {
1616 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1617 Err(err) => panic!("Failed to read item: {err}"),
1618 }
1619 }
1620 }
1621
1622 assert_eq!(items.len(), 3);
1624 assert_eq!(items[0].0, 1);
1625 assert_eq!(items[0].1, 1);
1626 assert_eq!(items[1].0, data_items[0].0);
1627 assert_eq!(items[1].1, data_items[0].1);
1628 assert_eq!(items[2].0, data_items[1].0);
1629 assert_eq!(items[2].1, data_items[1].1);
1630
1631 journal.append(2, 5).await.expect("Failed to append data");
1633 journal.sync(2).await.expect("Failed to sync blob");
1634
1635 let item = journal
1637 .get(2, 2)
1638 .await
1639 .expect("Failed to get item")
1640 .expect("Failed to get item");
1641 assert_eq!(item, 5);
1642
1643 journal.close().await.expect("Failed to close journal");
1645
1646 let (_, blob_size) = context
1648 .open(&cfg.partition, &2u64.to_be_bytes())
1649 .await
1650 .expect("Failed to open blob");
1651 assert_eq!(blob_size, 48);
1652
1653 let journal = Journal::init(context, cfg)
1655 .await
1656 .expect("Failed to re-initialize journal");
1657
1658 let mut items = Vec::<(u64, u64)>::new();
1660 {
1661 let stream = journal
1662 .replay(0, 0, NZUsize!(1024))
1663 .await
1664 .expect("unable to setup replay");
1665 pin_mut!(stream);
1666 while let Some(result) = stream.next().await {
1667 match result {
1668 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1669 Err(err) => panic!("Failed to read item: {err}"),
1670 }
1671 }
1672 }
1673 journal.close().await.expect("Failed to close journal");
1674
1675 assert_eq!(items.len(), 4);
1677 assert_eq!(items[0].0, 1);
1678 assert_eq!(items[0].1, 1);
1679 assert_eq!(items[1].0, data_items[0].0);
1680 assert_eq!(items[1].1, data_items[0].1);
1681 assert_eq!(items[2].0, data_items[1].0);
1682 assert_eq!(items[2].1, data_items[1].1);
1683 assert_eq!(items[3].0, 2);
1684 assert_eq!(items[3].1, 5);
1685 });
1686 }
1687
1688 #[test_traced]
1689 fn test_journal_handling_extra_data() {
1690 let executor = deterministic::Runner::default();
1692
1693 executor.start(|context| async move {
1695 let cfg = Config {
1697 partition: "test_partition".into(),
1698 compression: None,
1699 codec_config: (),
1700 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1701 write_buffer: NZUsize!(1024),
1702 };
1703
1704 let mut journal = Journal::init(context.clone(), cfg.clone())
1706 .await
1707 .expect("Failed to initialize journal");
1708
1709 journal.append(1, 1).await.expect("Failed to append data");
1711
1712 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1714 for (index, data) in &data_items {
1715 journal
1716 .append(*index, *data)
1717 .await
1718 .expect("Failed to append data");
1719 journal.sync(*index).await.expect("Failed to sync blob");
1720 }
1721
1722 journal.close().await.expect("Failed to close journal");
1724
1725 let (blob, blob_size) = context
1727 .open(&cfg.partition, &2u64.to_be_bytes())
1728 .await
1729 .expect("Failed to open blob");
1730 blob.write_at(vec![0u8; 16], blob_size)
1731 .await
1732 .expect("Failed to add extra data");
1733 blob.sync().await.expect("Failed to sync blob");
1734
1735 let journal = Journal::init(context, cfg)
1737 .await
1738 .expect("Failed to re-initialize journal");
1739
1740 let mut items = Vec::<(u64, i32)>::new();
1742 let stream = journal
1743 .replay(0, 0, NZUsize!(1024))
1744 .await
1745 .expect("unable to setup replay");
1746 pin_mut!(stream);
1747 while let Some(result) = stream.next().await {
1748 match result {
1749 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1750 Err(err) => panic!("Failed to read item: {err}"),
1751 }
1752 }
1753 });
1754 }
1755
1756 #[derive(Clone)]
1758 struct MockBlob {}
1759
1760 impl Blob for MockBlob {
1761 async fn read_at(
1762 &self,
1763 buf: impl Into<StableBuf> + Send,
1764 _offset: u64,
1765 ) -> Result<StableBuf, RError> {
1766 Ok(buf.into())
1767 }
1768
1769 async fn write_at(
1770 &self,
1771 _buf: impl Into<StableBuf> + Send,
1772 _offset: u64,
1773 ) -> Result<(), RError> {
1774 Ok(())
1775 }
1776
1777 async fn resize(&self, _len: u64) -> Result<(), RError> {
1778 Ok(())
1779 }
1780
1781 async fn sync(&self) -> Result<(), RError> {
1782 Ok(())
1783 }
1784 }
1785
1786 #[derive(Clone)]
1788 struct MockStorage {
1789 len: u64,
1790 }
1791
1792 impl Storage for MockStorage {
1793 type Blob = MockBlob;
1794
1795 async fn open(&self, _partition: &str, _name: &[u8]) -> Result<(MockBlob, u64), RError> {
1796 Ok((MockBlob {}, self.len))
1797 }
1798
1799 async fn remove(&self, _partition: &str, _name: Option<&[u8]>) -> Result<(), RError> {
1800 Ok(())
1801 }
1802
1803 async fn scan(&self, _partition: &str) -> Result<Vec<Vec<u8>>, RError> {
1804 Ok(vec![])
1805 }
1806 }
1807
1808 impl Metrics for MockStorage {
1809 fn with_label(&self, _: &str) -> Self {
1810 self.clone()
1811 }
1812
1813 fn label(&self) -> String {
1814 String::new()
1815 }
1816
1817 fn register<N: Into<String>, H: Into<String>>(&self, _: N, _: H, _: impl Metric) {}
1818
1819 fn encode(&self) -> String {
1820 String::new()
1821 }
1822 }
1823
1824 const INDEX_ALIGNMENT: u64 = 16;
1827
1828 #[test_traced]
1829 fn test_journal_large_offset() {
1830 let executor = deterministic::Runner::default();
1832 executor.start(|_| async move {
1833 let cfg = Config {
1835 partition: "partition".to_string(),
1836 compression: None,
1837 codec_config: (),
1838 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1839 write_buffer: NZUsize!(1024),
1840 };
1841 let context = MockStorage {
1842 len: u32::MAX as u64 * INDEX_ALIGNMENT, };
1844 let mut journal = Journal::init(context, cfg).await.unwrap();
1845
1846 let data = 1;
1848 let (result, _) = journal
1849 .append(1, data)
1850 .await
1851 .expect("Failed to append data");
1852 assert_eq!(result, u32::MAX);
1853 });
1854 }
1855
1856 #[test_traced]
1857 fn test_journal_offset_overflow() {
1858 let executor = deterministic::Runner::default();
1860 executor.start(|_| async move {
1861 let cfg = Config {
1863 partition: "partition".to_string(),
1864 compression: None,
1865 codec_config: (),
1866 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1867 write_buffer: NZUsize!(1024),
1868 };
1869 let context = MockStorage {
1870 len: u32::MAX as u64 * INDEX_ALIGNMENT + 1,
1871 };
1872 let mut journal = Journal::init(context, cfg).await.unwrap();
1873
1874 let data = 1;
1876 let result = journal.append(1, data).await;
1877 assert!(matches!(result, Err(Error::OffsetOverflow)));
1878 });
1879 }
1880
1881 #[test_traced]
1882 fn test_journal_rewind() {
1883 let executor = deterministic::Runner::default();
1885 executor.start(|context| async move {
1886 let cfg = Config {
1888 partition: "test_partition".to_string(),
1889 compression: None,
1890 codec_config: (),
1891 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1892 write_buffer: NZUsize!(1024),
1893 };
1894 let mut journal = Journal::init(context, cfg).await.unwrap();
1895
1896 let size = journal.size(1).await.unwrap();
1898 assert_eq!(size, 0);
1899
1900 journal.append(1, 42i32).await.unwrap();
1902
1903 let size = journal.size(1).await.unwrap();
1905 assert!(size > 0);
1906
1907 journal.append(1, 43i32).await.unwrap();
1909 let new_size = journal.size(1).await.unwrap();
1910 assert!(new_size > size);
1911
1912 let size = journal.size(2).await.unwrap();
1914 assert_eq!(size, 0);
1915
1916 journal.append(2, 44i32).await.unwrap();
1918
1919 let size = journal.size(2).await.unwrap();
1921 assert!(size > 0);
1922
1923 journal.rewind(1, 0).await.unwrap();
1925
1926 let size = journal.size(1).await.unwrap();
1928 assert_eq!(size, 0);
1929
1930 let size = journal.size(2).await.unwrap();
1932 assert_eq!(size, 0);
1933 });
1934 }
1935
1936 #[test_traced]
1937 fn test_journal_rewind_section() {
1938 let executor = deterministic::Runner::default();
1940 executor.start(|context| async move {
1941 let cfg = Config {
1943 partition: "test_partition".to_string(),
1944 compression: None,
1945 codec_config: (),
1946 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1947 write_buffer: NZUsize!(1024),
1948 };
1949 let mut journal = Journal::init(context, cfg).await.unwrap();
1950
1951 let size = journal.size(1).await.unwrap();
1953 assert_eq!(size, 0);
1954
1955 journal.append(1, 42i32).await.unwrap();
1957
1958 let size = journal.size(1).await.unwrap();
1960 assert!(size > 0);
1961
1962 journal.append(1, 43i32).await.unwrap();
1964 let new_size = journal.size(1).await.unwrap();
1965 assert!(new_size > size);
1966
1967 let size = journal.size(2).await.unwrap();
1969 assert_eq!(size, 0);
1970
1971 journal.append(2, 44i32).await.unwrap();
1973
1974 let size = journal.size(2).await.unwrap();
1976 assert!(size > 0);
1977
1978 journal.rewind_section(1, 0).await.unwrap();
1980
1981 let size = journal.size(1).await.unwrap();
1983 assert_eq!(size, 0);
1984
1985 let size = journal.size(2).await.unwrap();
1987 assert!(size > 0);
1988 });
1989 }
1990
1991 #[test_traced]
1993 fn test_journal_conformance() {
1994 let executor = deterministic::Runner::default();
1996
1997 executor.start(|context| async move {
1999 let cfg = Config {
2001 partition: "test_partition".into(),
2002 compression: None,
2003 codec_config: (),
2004 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
2005 write_buffer: NZUsize!(1024),
2006 };
2007
2008 let mut journal = Journal::init(context.clone(), cfg.clone())
2010 .await
2011 .expect("Failed to initialize journal");
2012
2013 for i in 0..100 {
2015 journal.append(1, i).await.expect("Failed to append data");
2016 }
2017 journal.sync(1).await.expect("Failed to sync blob");
2018
2019 journal.close().await.expect("Failed to close journal");
2021
2022 let (blob, size) = context
2024 .open(&cfg.partition, &1u64.to_be_bytes())
2025 .await
2026 .expect("Failed to open blob");
2027 assert!(size > 0);
2028 let buf = blob
2029 .read_at(vec![0u8; size as usize], 0)
2030 .await
2031 .expect("Failed to read blob");
2032 let digest = Sha256::hash(buf.as_ref());
2033 assert_eq!(
2034 hex(&digest),
2035 "ca3845fa7fabd4d2855ab72ed21226d1d6eb30cb895ea9ec5e5a14201f3f25d8",
2036 );
2037 });
2038 }
2039}