1use super::Error;
104use bytes::BufMut;
105use commonware_codec::Codec;
106use commonware_runtime::{
107 buffer::{Append, PoolRef, Read},
108 Blob, Error as RError, Metrics, Storage,
109};
110use commonware_utils::hex;
111use futures::stream::{self, Stream, StreamExt};
112use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
113use std::{
114 collections::{btree_map::Entry, BTreeMap},
115 io::Cursor,
116 marker::PhantomData,
117 num::NonZeroUsize,
118};
119use tracing::{debug, trace, warn};
120use zstd::{bulk::compress, decode_all};
121
122#[derive(Clone)]
124pub struct Config<C> {
125 pub partition: String,
128
129 pub compression: Option<u8>,
131
132 pub codec_config: C,
134
135 pub buffer_pool: PoolRef,
137
138 pub write_buffer: NonZeroUsize,
140}
141
142const ITEM_ALIGNMENT: u64 = 16;
143
144#[inline]
147fn compute_next_offset(mut offset: u64) -> Result<u32, Error> {
148 let overage = offset % ITEM_ALIGNMENT;
149 if overage != 0 {
150 offset += ITEM_ALIGNMENT - overage;
151 }
152 let offset = offset / ITEM_ALIGNMENT;
153 let aligned_offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
154 Ok(aligned_offset)
155}
156
157pub struct Journal<E: Storage + Metrics, V: Codec> {
159 context: E,
160 cfg: Config<V::Cfg>,
161
162 oldest_allowed: Option<u64>,
163
164 blobs: BTreeMap<u64, Append<E::Blob>>,
165
166 tracked: Gauge,
167 synced: Counter,
168 pruned: Counter,
169
170 _phantom: PhantomData<V>,
171}
172
173impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
174 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
180 let mut blobs = BTreeMap::new();
182 let stored_blobs = match context.scan(&cfg.partition).await {
183 Ok(blobs) => blobs,
184 Err(RError::PartitionMissing(_)) => Vec::new(),
185 Err(err) => return Err(Error::Runtime(err)),
186 };
187 for name in stored_blobs {
188 let (blob, size) = context.open(&cfg.partition, &name).await?;
189 let hex_name = hex(&name);
190 let section = match name.try_into() {
191 Ok(section) => u64::from_be_bytes(section),
192 Err(_) => return Err(Error::InvalidBlobName(hex_name)),
193 };
194 debug!(section, blob = hex_name, size, "loaded section");
195 let blob = Append::new(blob, size, cfg.write_buffer, cfg.buffer_pool.clone()).await?;
196 blobs.insert(section, blob);
197 }
198
199 let tracked = Gauge::default();
201 let synced = Counter::default();
202 let pruned = Counter::default();
203 context.register("tracked", "Number of blobs", tracked.clone());
204 context.register("synced", "Number of syncs", synced.clone());
205 context.register("pruned", "Number of blobs pruned", pruned.clone());
206 tracked.set(blobs.len() as i64);
207
208 Ok(Self {
210 context,
211 cfg,
212
213 oldest_allowed: None,
214
215 blobs,
216 tracked,
217 synced,
218 pruned,
219
220 _phantom: PhantomData,
221 })
222 }
223
224 fn prune_guard(&self, section: u64, inclusive: bool) -> Result<(), Error> {
226 if let Some(oldest_allowed) = self.oldest_allowed {
227 if section < oldest_allowed || (inclusive && section <= oldest_allowed) {
228 return Err(Error::AlreadyPrunedToSection(oldest_allowed));
229 }
230 }
231 Ok(())
232 }
233
234 async fn read(
236 compressed: bool,
237 cfg: &V::Cfg,
238 blob: &Append<E::Blob>,
239 offset: u32,
240 ) -> Result<(u32, u32, V), Error> {
241 let mut hasher = crc32fast::Hasher::new();
243 let offset = offset as u64 * ITEM_ALIGNMENT;
244 let size = blob.read_at(vec![0; 4], offset).await?;
245 hasher.update(size.as_ref());
246 let size = u32::from_be_bytes(size.as_ref().try_into().unwrap()) as usize;
247 let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
248
249 let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
251 let buf = blob.read_at(vec![0u8; buf_size], offset).await?;
252 let buf = buf.as_ref();
253 let offset = offset
254 .checked_add(buf_size as u64)
255 .ok_or(Error::OffsetOverflow)?;
256
257 let item = &buf[..size];
259 hasher.update(item);
260
261 let checksum = hasher.finalize();
263 let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
264 if checksum != stored_checksum {
265 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
266 }
267
268 let aligned_offset = compute_next_offset(offset)?;
270
271 let item = if compressed {
273 let decompressed =
274 decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
275 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
276 } else {
277 V::decode_cfg(item, cfg).map_err(Error::Codec)?
278 };
279
280 Ok((aligned_offset, size as u32, item))
282 }
283
284 async fn read_buffered(
286 reader: &mut Read<Append<E::Blob>>,
287 offset: u32,
288 cfg: &V::Cfg,
289 compressed: bool,
290 ) -> Result<(u32, u64, u32, V), Error> {
291 let file_offset = offset as u64 * ITEM_ALIGNMENT;
293
294 if reader.position() != file_offset {
296 reader.seek_to(file_offset).map_err(Error::Runtime)?;
297 }
298
299 let mut hasher = crc32fast::Hasher::new();
301 let mut size_buf = [0u8; 4];
302 reader
303 .read_exact(&mut size_buf, 4)
304 .await
305 .map_err(Error::Runtime)?;
306 hasher.update(&size_buf);
307
308 let size = u32::from_be_bytes(size_buf) as usize;
310 let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
311 let mut buf = vec![0u8; buf_size];
312 reader
313 .read_exact(&mut buf, buf_size)
314 .await
315 .map_err(Error::Runtime)?;
316
317 let item = &buf[..size];
319 hasher.update(item);
320
321 let checksum = hasher.finalize();
323 let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
324 if checksum != stored_checksum {
325 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
326 }
327
328 let item = if compressed {
330 let decompressed =
331 decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
332 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
333 } else {
334 V::decode_cfg(item, cfg).map_err(Error::Codec)?
335 };
336
337 let current_pos = reader.position();
339 let aligned_offset = compute_next_offset(current_pos)?;
340 Ok((aligned_offset, current_pos, size as u32, item))
341 }
342
343 async fn read_exact(
345 compressed: bool,
346 cfg: &V::Cfg,
347 blob: &Append<E::Blob>,
348 offset: u32,
349 len: u32,
350 ) -> Result<V, Error> {
351 let offset = offset as u64 * ITEM_ALIGNMENT;
353 let entry_size = 4 + len as usize + 4;
354 let buf = blob.read_at(vec![0u8; entry_size], offset).await?;
355
356 let mut hasher = crc32fast::Hasher::new();
358 let disk_size = u32::from_be_bytes(buf.as_ref()[..4].try_into().unwrap());
359 hasher.update(&buf.as_ref()[..4]);
360 if disk_size != len {
361 return Err(Error::UnexpectedSize(disk_size, len));
362 }
363
364 let item = &buf.as_ref()[4..4 + len as usize];
366 hasher.update(item);
367 let checksum = hasher.finalize();
368 let stored_checksum =
369 u32::from_be_bytes(buf.as_ref()[4 + len as usize..].try_into().unwrap());
370 if checksum != stored_checksum {
371 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
372 }
373
374 let item = if compressed {
376 decode_all(Cursor::new(item)).map_err(|_| Error::DecompressionFailed)?
377 } else {
378 item.to_vec()
379 };
380
381 let item = V::decode_cfg(item.as_ref(), cfg).map_err(Error::Codec)?;
383 Ok(item)
384 }
385
386 pub async fn replay(
396 &self,
397 buffer: NonZeroUsize,
398 ) -> Result<impl Stream<Item = Result<(u64, u32, u32, V), Error>> + '_, Error> {
399 let codec_config = self.cfg.codec_config.clone();
401 let compressed = self.cfg.compression.is_some();
402 let mut blobs = Vec::with_capacity(self.blobs.len());
403 for (section, blob) in self.blobs.iter() {
404 let blob_size = blob.size().await;
405 let max_offset = compute_next_offset(blob_size)?;
406 blobs.push((
407 *section,
408 blob.clone(),
409 max_offset,
410 blob_size,
411 codec_config.clone(),
412 compressed,
413 ));
414 }
415
416 Ok(
419 stream::iter(blobs).flat_map(
420 move |(section, blob, max_offset, blob_size, codec_config, compressed)| {
421 let reader = Read::new(blob, blob_size, buffer);
423
424 stream::unfold(
426 (section, reader, 0u32, 0u64, codec_config, compressed),
427 move |(
428 section,
429 mut reader,
430 offset,
431 valid_size,
432 codec_config,
433 compressed,
434 )| async move {
435 if offset >= max_offset {
437 return None;
438 }
439
440 match Self::read_buffered(
442 &mut reader,
443 offset,
444 &codec_config,
445 compressed,
446 )
447 .await
448 {
449 Ok((next_offset, next_valid_size, size, item)) => {
450 trace!(blob = section, cursor = offset, "replayed item");
451 Some((
452 Ok((section, offset, size, item)),
453 (
454 section,
455 reader,
456 next_offset,
457 next_valid_size,
458 codec_config,
459 compressed,
460 ),
461 ))
462 }
463 Err(Error::ChecksumMismatch(expected, found)) => {
464 warn!(
468 blob = section,
469 new_offset = offset,
470 new_size = valid_size,
471 expected,
472 found,
473 "corruption detected: truncating"
474 );
475 reader.resize(valid_size).await.ok()?;
476 None
477 }
478 Err(Error::Runtime(RError::BlobInsufficientLength)) => {
479 warn!(
483 blob = section,
484 new_offset = offset,
485 new_size = valid_size,
486 "trailing bytes detected: truncating"
487 );
488 reader.resize(valid_size).await.ok()?;
489 None
490 }
491 Err(err) => {
492 warn!(
495 blob = section,
496 cursor = offset,
497 ?err,
498 "unexpected error"
499 );
500 Some((
501 Err(err),
502 (
503 section,
504 reader,
505 offset,
506 valid_size,
507 codec_config,
508 compressed,
509 ),
510 ))
511 }
512 }
513 },
514 )
515 },
516 ),
517 )
518 }
519
520 pub async fn append(&mut self, section: u64, item: V) -> Result<(u32, u32), Error> {
532 self.prune_guard(section, false)?;
534
535 let encoded = item.encode();
537 let encoded = if let Some(compression) = self.cfg.compression {
538 compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?
539 } else {
540 encoded.into()
541 };
542
543 let item_len = encoded.len();
545 let entry_len = 4 + item_len + 4;
546 let item_len = match item_len.try_into() {
547 Ok(len) => len,
548 Err(_) => return Err(Error::ItemTooLarge(item_len)),
549 };
550
551 let blob = match self.blobs.entry(section) {
553 Entry::Occupied(entry) => entry.into_mut(),
554 Entry::Vacant(entry) => {
555 let name = section.to_be_bytes();
556 let (blob, size) = self.context.open(&self.cfg.partition, &name).await?;
557 let blob = Append::new(
558 blob,
559 size,
560 self.cfg.write_buffer,
561 self.cfg.buffer_pool.clone(),
562 )
563 .await?;
564 self.tracked.inc();
565 entry.insert(blob)
566 }
567 };
568
569 let cursor = blob.size().await;
571 let offset = compute_next_offset(cursor)?;
572 let aligned_cursor = offset as u64 * ITEM_ALIGNMENT;
573 let padding = (aligned_cursor - cursor) as usize;
574
575 let mut buf = Vec::with_capacity(padding + entry_len);
577
578 if padding > 0 {
580 buf.resize(padding, 0);
581 }
582
583 let entry_start = buf.len();
585 buf.put_u32(item_len);
586 buf.put_slice(&encoded);
587
588 let checksum = crc32fast::hash(&buf[entry_start..]);
590 buf.put_u32(checksum);
591 assert_eq!(buf[entry_start..].len(), entry_len);
592
593 blob.append(buf).await?;
595 trace!(blob = section, offset, "appended item");
596 Ok((offset, item_len))
597 }
598
599 pub async fn get(&self, section: u64, offset: u32) -> Result<Option<V>, Error> {
601 self.prune_guard(section, false)?;
602 let blob = match self.blobs.get(§ion) {
603 Some(blob) => blob,
604 None => return Ok(None),
605 };
606
607 let (_, _, item) = Self::read(
609 self.cfg.compression.is_some(),
610 &self.cfg.codec_config,
611 blob,
612 offset,
613 )
614 .await?;
615 Ok(Some(item))
616 }
617
618 pub async fn get_exact(
620 &self,
621 section: u64,
622 offset: u32,
623 size: u32,
624 ) -> Result<Option<V>, Error> {
625 self.prune_guard(section, false)?;
626 let blob = match self.blobs.get(§ion) {
627 Some(blob) => blob,
628 None => return Ok(None),
629 };
630
631 let item = Self::read_exact(
633 self.cfg.compression.is_some(),
634 &self.cfg.codec_config,
635 blob,
636 offset,
637 size,
638 )
639 .await?;
640 Ok(Some(item))
641 }
642
643 pub async fn size(&self, section: u64) -> Result<u64, Error> {
647 self.prune_guard(section, false)?;
648 match self.blobs.get(§ion) {
649 Some(blob) => Ok(blob.size().await),
650 None => Ok(0),
651 }
652 }
653
654 pub async fn rewind_to_offset(&mut self, section: u64, offset: u32) -> Result<(), Error> {
656 self.rewind(section, offset as u64 * ITEM_ALIGNMENT).await
657 }
658
659 pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
663 self.prune_guard(section, false)?;
664
665 let trailing: Vec<u64> = self
667 .blobs
668 .range((
669 std::ops::Bound::Excluded(section),
670 std::ops::Bound::Unbounded,
671 ))
672 .map(|(§ion, _)| section)
673 .collect();
674 for index in trailing.iter().rev() {
675 let blob = self.blobs.remove(index).unwrap();
677
678 drop(blob);
680 self.context
681 .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
682 .await?;
683 debug!(section = index, "removed section");
684 self.tracked.dec();
685 }
686
687 let blob = match self.blobs.get_mut(§ion) {
689 Some(blob) => blob,
690 None => return Ok(()),
691 };
692 let current = blob.size().await;
693 if size >= current {
694 return Ok(()); }
696 blob.resize(size).await?;
697 debug!(
698 section,
699 from = current,
700 to = size,
701 ?trailing,
702 "rewound journal"
703 );
704 Ok(())
705 }
706
707 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
711 self.prune_guard(section, false)?;
712
713 let blob = match self.blobs.get_mut(§ion) {
715 Some(blob) => blob,
716 None => return Ok(()),
717 };
718
719 let current = blob.size().await;
721 if size >= current {
722 return Ok(()); }
724 blob.resize(size).await?;
725 debug!(section, from = current, to = size, "rewound section");
726 Ok(())
727 }
728
729 pub async fn sync(&self, section: u64) -> Result<(), Error> {
733 self.prune_guard(section, false)?;
734 let blob = match self.blobs.get(§ion) {
735 Some(blob) => blob,
736 None => return Ok(()),
737 };
738 self.synced.inc();
739 blob.sync().await.map_err(Error::Runtime)
740 }
741
742 pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
744 while let Some((§ion, _)) = self.blobs.first_key_value() {
746 if section >= min {
748 break;
749 }
750
751 let blob = self.blobs.remove(§ion).unwrap();
753 let size = blob.size().await;
754 drop(blob);
755
756 self.context
758 .remove(&self.cfg.partition, Some(§ion.to_be_bytes()))
759 .await?;
760 debug!(blob = section, size, "pruned blob");
761 self.tracked.dec();
762 self.pruned.inc();
763 }
764
765 self.oldest_allowed = Some(min);
767 Ok(())
768 }
769
770 pub async fn close(self) -> Result<(), Error> {
772 for (section, blob) in self.blobs.into_iter() {
773 let size = blob.size().await;
774 blob.sync().await?;
775 debug!(blob = section, size, "synced blob");
776 }
777 Ok(())
778 }
779
780 pub async fn destroy(self) -> Result<(), Error> {
782 for (i, blob) in self.blobs.into_iter() {
783 let size = blob.size().await;
784 drop(blob);
785 debug!(blob = i, size, "destroyed blob");
786 self.context
787 .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
788 .await?;
789 }
790 match self.context.remove(&self.cfg.partition, None).await {
791 Ok(()) => {}
792 Err(RError::PartitionMissing(_)) => {
793 }
795 Err(err) => return Err(Error::Runtime(err)),
796 }
797 Ok(())
798 }
799}
800
801#[cfg(test)]
802mod tests {
803 use super::*;
804 use bytes::BufMut;
805 use commonware_cryptography::hash;
806 use commonware_macros::test_traced;
807 use commonware_runtime::{
808 buffer::PoolRef, deterministic, Blob, Error as RError, Runner, Storage,
809 };
810 use commonware_utils::{NZUsize, StableBuf};
811 use futures::{pin_mut, StreamExt};
812 use prometheus_client::registry::Metric;
813
814 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
815 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
816
817 #[test_traced]
818 fn test_journal_append_and_read() {
819 let executor = deterministic::Runner::default();
821
822 executor.start(|context| async move {
824 let cfg = Config {
826 partition: "test_partition".into(),
827 compression: None,
828 codec_config: (),
829 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
830 write_buffer: NZUsize!(1024),
831 };
832 let index = 1u64;
833 let data = 10;
834 let mut journal = Journal::init(context.clone(), cfg.clone())
835 .await
836 .expect("Failed to initialize journal");
837
838 journal
840 .append(index, data)
841 .await
842 .expect("Failed to append data");
843
844 let buffer = context.encode();
846 assert!(buffer.contains("tracked 1"));
847
848 journal.close().await.expect("Failed to close journal");
850
851 let cfg = Config {
853 partition: "test_partition".into(),
854 compression: None,
855 codec_config: (),
856 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
857 write_buffer: NZUsize!(1024),
858 };
859 let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
860 .await
861 .expect("Failed to re-initialize journal");
862
863 let mut items = Vec::new();
865 let stream = journal
866 .replay(NZUsize!(1024))
867 .await
868 .expect("unable to setup replay");
869 pin_mut!(stream);
870 while let Some(result) = stream.next().await {
871 match result {
872 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
873 Err(err) => panic!("Failed to read item: {err}"),
874 }
875 }
876
877 assert_eq!(items.len(), 1);
879 assert_eq!(items[0].0, index);
880 assert_eq!(items[0].1, data);
881
882 let buffer = context.encode();
884 assert!(buffer.contains("tracked 1"));
885 });
886 }
887
888 #[test_traced]
889 fn test_journal_multiple_appends_and_reads() {
890 let executor = deterministic::Runner::default();
892
893 executor.start(|context| async move {
895 let cfg = Config {
897 partition: "test_partition".into(),
898 compression: None,
899 codec_config: (),
900 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
901 write_buffer: NZUsize!(1024),
902 };
903
904 let mut journal = Journal::init(context.clone(), cfg.clone())
906 .await
907 .expect("Failed to initialize journal");
908
909 let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
911 for (index, data) in &data_items {
912 journal
913 .append(*index, *data)
914 .await
915 .expect("Failed to append data");
916 journal.sync(*index).await.expect("Failed to sync blob");
917 }
918
919 let buffer = context.encode();
921 assert!(buffer.contains("tracked 3"));
922 assert!(buffer.contains("synced_total 4"));
923
924 journal.close().await.expect("Failed to close journal");
926
927 let journal = Journal::init(context, cfg)
929 .await
930 .expect("Failed to re-initialize journal");
931
932 let mut items = Vec::<(u64, u32)>::new();
934 {
935 let stream = journal
936 .replay(NZUsize!(1024))
937 .await
938 .expect("unable to setup replay");
939 pin_mut!(stream);
940 while let Some(result) = stream.next().await {
941 match result {
942 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
943 Err(err) => panic!("Failed to read item: {err}"),
944 }
945 }
946 }
947
948 assert_eq!(items.len(), data_items.len());
950 for ((expected_index, expected_data), (actual_index, actual_data)) in
951 data_items.iter().zip(items.iter())
952 {
953 assert_eq!(actual_index, expected_index);
954 assert_eq!(actual_data, expected_data);
955 }
956
957 journal.destroy().await.expect("Failed to destroy journal");
959 });
960 }
961
962 #[test_traced]
963 fn test_journal_prune_blobs() {
964 let executor = deterministic::Runner::default();
966
967 executor.start(|context| async move {
969 let cfg = Config {
971 partition: "test_partition".into(),
972 compression: None,
973 codec_config: (),
974 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
975 write_buffer: NZUsize!(1024),
976 };
977
978 let mut journal = Journal::init(context.clone(), cfg.clone())
980 .await
981 .expect("Failed to initialize journal");
982
983 for index in 1u64..=5u64 {
985 journal
986 .append(index, index)
987 .await
988 .expect("Failed to append data");
989 journal.sync(index).await.expect("Failed to sync blob");
990 }
991
992 let data = 99;
994 journal
995 .append(2u64, data)
996 .await
997 .expect("Failed to append data");
998 journal.sync(2u64).await.expect("Failed to sync blob");
999
1000 journal.prune(3).await.expect("Failed to prune blobs");
1002
1003 let buffer = context.encode();
1005 assert!(buffer.contains("pruned_total 2"));
1006
1007 journal.prune(2).await.expect("Failed to no-op prune");
1009 let buffer = context.encode();
1010 assert!(buffer.contains("pruned_total 2"));
1011
1012 journal.close().await.expect("Failed to close journal");
1014
1015 let mut journal = Journal::init(context.clone(), cfg.clone())
1017 .await
1018 .expect("Failed to re-initialize journal");
1019
1020 let mut items = Vec::<(u64, u64)>::new();
1022 {
1023 let stream = journal
1024 .replay(NZUsize!(1024))
1025 .await
1026 .expect("unable to setup replay");
1027 pin_mut!(stream);
1028 while let Some(result) = stream.next().await {
1029 match result {
1030 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1031 Err(err) => panic!("Failed to read item: {err}"),
1032 }
1033 }
1034 }
1035
1036 assert_eq!(items.len(), 3);
1038 let expected_indices = [3u64, 4u64, 5u64];
1039 for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
1040 assert_eq!(item.0, *expected_index);
1041 }
1042
1043 journal.prune(6).await.expect("Failed to prune blobs");
1045
1046 journal.close().await.expect("Failed to close journal");
1048
1049 assert!(context
1054 .scan(&cfg.partition)
1055 .await
1056 .expect("Failed to list blobs")
1057 .is_empty());
1058 });
1059 }
1060
1061 #[test_traced]
1062 fn test_journal_with_invalid_blob_name() {
1063 let executor = deterministic::Runner::default();
1065
1066 executor.start(|context| async move {
1068 let cfg = Config {
1070 partition: "test_partition".into(),
1071 compression: None,
1072 codec_config: (),
1073 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1074 write_buffer: NZUsize!(1024),
1075 };
1076
1077 let invalid_blob_name = b"invalid"; let (blob, _) = context
1080 .open(&cfg.partition, invalid_blob_name)
1081 .await
1082 .expect("Failed to create blob with invalid name");
1083 blob.sync().await.expect("Failed to sync blob");
1084
1085 let result = Journal::<_, u64>::init(context, cfg).await;
1087
1088 assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1090 });
1091 }
1092
1093 #[test_traced]
1094 fn test_journal_read_size_missing() {
1095 let executor = deterministic::Runner::default();
1097
1098 executor.start(|context| async move {
1100 let cfg = Config {
1102 partition: "test_partition".into(),
1103 compression: None,
1104 codec_config: (),
1105 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1106 write_buffer: NZUsize!(1024),
1107 };
1108
1109 let section = 1u64;
1111 let blob_name = section.to_be_bytes();
1112 let (blob, _) = context
1113 .open(&cfg.partition, &blob_name)
1114 .await
1115 .expect("Failed to create blob");
1116
1117 let incomplete_data = vec![0x00, 0x01]; blob.write_at(incomplete_data, 0)
1120 .await
1121 .expect("Failed to write incomplete data");
1122 blob.sync().await.expect("Failed to sync blob");
1123
1124 let journal = Journal::init(context, cfg)
1126 .await
1127 .expect("Failed to initialize journal");
1128
1129 let stream = journal
1131 .replay(NZUsize!(1024))
1132 .await
1133 .expect("unable to setup replay");
1134 pin_mut!(stream);
1135 let mut items = Vec::<(u64, u64)>::new();
1136 while let Some(result) = stream.next().await {
1137 match result {
1138 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1139 Err(err) => panic!("Failed to read item: {err}"),
1140 }
1141 }
1142 assert!(items.is_empty());
1143 });
1144 }
1145
1146 #[test_traced]
1147 fn test_journal_read_item_missing() {
1148 let executor = deterministic::Runner::default();
1150
1151 executor.start(|context| async move {
1153 let cfg = Config {
1155 partition: "test_partition".into(),
1156 compression: None,
1157 codec_config: (),
1158 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1159 write_buffer: NZUsize!(1024),
1160 };
1161
1162 let section = 1u64;
1164 let blob_name = section.to_be_bytes();
1165 let (blob, _) = context
1166 .open(&cfg.partition, &blob_name)
1167 .await
1168 .expect("Failed to create blob");
1169
1170 let item_size: u32 = 10; let mut buf = Vec::new();
1173 buf.put_u32(item_size);
1174 let data = [2u8; 5];
1175 BufMut::put_slice(&mut buf, &data);
1176 blob.write_at(buf, 0)
1177 .await
1178 .expect("Failed to write item size");
1179 blob.sync().await.expect("Failed to sync blob");
1180
1181 let journal = Journal::init(context, cfg)
1183 .await
1184 .expect("Failed to initialize journal");
1185
1186 let stream = journal
1188 .replay(NZUsize!(1024))
1189 .await
1190 .expect("unable to setup replay");
1191 pin_mut!(stream);
1192 let mut items = Vec::<(u64, u64)>::new();
1193 while let Some(result) = stream.next().await {
1194 match result {
1195 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1196 Err(err) => panic!("Failed to read item: {err}"),
1197 }
1198 }
1199 assert!(items.is_empty());
1200 });
1201 }
1202
1203 #[test_traced]
1204 fn test_journal_read_checksum_missing() {
1205 let executor = deterministic::Runner::default();
1207
1208 executor.start(|context| async move {
1210 let cfg = Config {
1212 partition: "test_partition".into(),
1213 compression: None,
1214 codec_config: (),
1215 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1216 write_buffer: NZUsize!(1024),
1217 };
1218
1219 let section = 1u64;
1221 let blob_name = section.to_be_bytes();
1222 let (blob, _) = context
1223 .open(&cfg.partition, &blob_name)
1224 .await
1225 .expect("Failed to create blob");
1226
1227 let item_data = b"Test data";
1229 let item_size = item_data.len() as u32;
1230
1231 let mut offset = 0;
1233 blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1234 .await
1235 .expect("Failed to write item size");
1236 offset += 4;
1237
1238 blob.write_at(item_data.to_vec(), offset)
1240 .await
1241 .expect("Failed to write item data");
1242 blob.sync().await.expect("Failed to sync blob");
1245
1246 let journal = Journal::init(context, cfg)
1248 .await
1249 .expect("Failed to initialize journal");
1250
1251 let stream = journal
1255 .replay(NZUsize!(1024))
1256 .await
1257 .expect("unable to setup replay");
1258 pin_mut!(stream);
1259 let mut items = Vec::<(u64, u64)>::new();
1260 while let Some(result) = stream.next().await {
1261 match result {
1262 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1263 Err(err) => panic!("Failed to read item: {err}"),
1264 }
1265 }
1266 assert!(items.is_empty());
1267 });
1268 }
1269
1270 #[test_traced]
1271 fn test_journal_read_checksum_mismatch() {
1272 let executor = deterministic::Runner::default();
1274
1275 executor.start(|context| async move {
1277 let cfg = Config {
1279 partition: "test_partition".into(),
1280 compression: None,
1281 codec_config: (),
1282 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1283 write_buffer: NZUsize!(1024),
1284 };
1285
1286 let section = 1u64;
1288 let blob_name = section.to_be_bytes();
1289 let (blob, _) = context
1290 .open(&cfg.partition, &blob_name)
1291 .await
1292 .expect("Failed to create blob");
1293
1294 let item_data = b"Test data";
1296 let item_size = item_data.len() as u32;
1297 let incorrect_checksum: u32 = 0xDEADBEEF;
1298
1299 let mut offset = 0;
1301 blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1302 .await
1303 .expect("Failed to write item size");
1304 offset += 4;
1305
1306 blob.write_at(item_data.to_vec(), offset)
1308 .await
1309 .expect("Failed to write item data");
1310 offset += item_data.len() as u64;
1311
1312 blob.write_at(incorrect_checksum.to_be_bytes().to_vec(), offset)
1314 .await
1315 .expect("Failed to write incorrect checksum");
1316
1317 blob.sync().await.expect("Failed to sync blob");
1318
1319 let journal = Journal::init(context.clone(), cfg.clone())
1321 .await
1322 .expect("Failed to initialize journal");
1323
1324 {
1326 let stream = journal
1327 .replay(NZUsize!(1024))
1328 .await
1329 .expect("unable to setup replay");
1330 pin_mut!(stream);
1331 let mut items = Vec::<(u64, u64)>::new();
1332 while let Some(result) = stream.next().await {
1333 match result {
1334 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1335 Err(err) => panic!("Failed to read item: {err}"),
1336 }
1337 }
1338 assert!(items.is_empty());
1339 }
1340 journal.close().await.expect("Failed to close journal");
1341
1342 let (_, blob_size) = context
1344 .open(&cfg.partition, §ion.to_be_bytes())
1345 .await
1346 .expect("Failed to open blob");
1347 assert_eq!(blob_size, 0);
1348 });
1349 }
1350
1351 #[test_traced]
1352 fn test_journal_handling_unaligned_truncated_data() {
1353 let executor = deterministic::Runner::default();
1355
1356 executor.start(|context| async move {
1358 let cfg = Config {
1360 partition: "test_partition".into(),
1361 compression: None,
1362 codec_config: (),
1363 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1364 write_buffer: NZUsize!(1024),
1365 };
1366
1367 let mut journal = Journal::init(context.clone(), cfg.clone())
1369 .await
1370 .expect("Failed to initialize journal");
1371
1372 journal.append(1, 1).await.expect("Failed to append data");
1374
1375 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1377 for (index, data) in &data_items {
1378 journal
1379 .append(*index, *data)
1380 .await
1381 .expect("Failed to append data");
1382 journal.sync(*index).await.expect("Failed to sync blob");
1383 }
1384
1385 journal.close().await.expect("Failed to close journal");
1387
1388 let (blob, blob_size) = context
1390 .open(&cfg.partition, &2u64.to_be_bytes())
1391 .await
1392 .expect("Failed to open blob");
1393 blob.resize(blob_size - 4)
1394 .await
1395 .expect("Failed to corrupt blob");
1396 blob.sync().await.expect("Failed to sync blob");
1397
1398 let journal = Journal::init(context.clone(), cfg.clone())
1400 .await
1401 .expect("Failed to re-initialize journal");
1402
1403 let mut items = Vec::<(u64, u32)>::new();
1405 {
1406 let stream = journal
1407 .replay(NZUsize!(1024))
1408 .await
1409 .expect("unable to setup replay");
1410 pin_mut!(stream);
1411 while let Some(result) = stream.next().await {
1412 match result {
1413 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1414 Err(err) => panic!("Failed to read item: {err}"),
1415 }
1416 }
1417 }
1418 journal.close().await.expect("Failed to close journal");
1419
1420 assert_eq!(items.len(), 3);
1422 assert_eq!(items[0].0, 1);
1423 assert_eq!(items[0].1, 1);
1424 assert_eq!(items[1].0, data_items[0].0);
1425 assert_eq!(items[1].1, data_items[0].1);
1426 assert_eq!(items[2].0, data_items[1].0);
1427 assert_eq!(items[2].1, data_items[1].1);
1428
1429 let (_, blob_size) = context
1431 .open(&cfg.partition, &2u64.to_be_bytes())
1432 .await
1433 .expect("Failed to open blob");
1434 assert_eq!(blob_size, 28);
1435
1436 let mut journal = Journal::init(context.clone(), cfg.clone())
1438 .await
1439 .expect("Failed to re-initialize journal");
1440
1441 let mut items = Vec::<(u64, u32)>::new();
1443 {
1444 let stream = journal
1445 .replay(NZUsize!(1024))
1446 .await
1447 .expect("unable to setup replay");
1448 pin_mut!(stream);
1449 while let Some(result) = stream.next().await {
1450 match result {
1451 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1452 Err(err) => panic!("Failed to read item: {err}"),
1453 }
1454 }
1455 }
1456
1457 assert_eq!(items.len(), 3);
1459 assert_eq!(items[0].0, 1);
1460 assert_eq!(items[0].1, 1);
1461 assert_eq!(items[1].0, data_items[0].0);
1462 assert_eq!(items[1].1, data_items[0].1);
1463 assert_eq!(items[2].0, data_items[1].0);
1464 assert_eq!(items[2].1, data_items[1].1);
1465
1466 journal.append(2, 5).await.expect("Failed to append data");
1468 journal.sync(2).await.expect("Failed to sync blob");
1469
1470 let item = journal
1472 .get(2, 2)
1473 .await
1474 .expect("Failed to get item")
1475 .expect("Failed to get item");
1476 assert_eq!(item, 5);
1477
1478 journal.close().await.expect("Failed to close journal");
1480
1481 let (_, blob_size) = context
1483 .open(&cfg.partition, &2u64.to_be_bytes())
1484 .await
1485 .expect("Failed to open blob");
1486 assert_eq!(blob_size, 44);
1487
1488 let journal = Journal::init(context.clone(), cfg.clone())
1490 .await
1491 .expect("Failed to re-initialize journal");
1492
1493 let mut items = Vec::<(u64, u32)>::new();
1495 {
1496 let stream = journal
1497 .replay(NZUsize!(1024))
1498 .await
1499 .expect("unable to setup replay");
1500 pin_mut!(stream);
1501 while let Some(result) = stream.next().await {
1502 match result {
1503 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1504 Err(err) => panic!("Failed to read item: {err}"),
1505 }
1506 }
1507 }
1508
1509 assert_eq!(items.len(), 4);
1511 assert_eq!(items[0].0, 1);
1512 assert_eq!(items[0].1, 1);
1513 assert_eq!(items[1].0, data_items[0].0);
1514 assert_eq!(items[1].1, data_items[0].1);
1515 assert_eq!(items[2].0, data_items[1].0);
1516 assert_eq!(items[2].1, data_items[1].1);
1517 assert_eq!(items[3].0, 2);
1518 assert_eq!(items[3].1, 5);
1519 });
1520 }
1521
1522 #[test_traced]
1523 fn test_journal_handling_aligned_truncated_data() {
1524 let executor = deterministic::Runner::default();
1526
1527 executor.start(|context| async move {
1529 let cfg = Config {
1531 partition: "test_partition".into(),
1532 compression: None,
1533 codec_config: (),
1534 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1535 write_buffer: NZUsize!(1024),
1536 };
1537
1538 let mut journal = Journal::init(context.clone(), cfg.clone())
1540 .await
1541 .expect("Failed to initialize journal");
1542
1543 journal.append(1, 1).await.expect("Failed to append data");
1545
1546 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1548 for (index, data) in &data_items {
1549 journal
1550 .append(*index, *data)
1551 .await
1552 .expect("Failed to append data");
1553 journal.sync(*index).await.expect("Failed to sync blob");
1554 }
1555
1556 journal.close().await.expect("Failed to close journal");
1558
1559 let (blob, blob_size) = context
1561 .open(&cfg.partition, &2u64.to_be_bytes())
1562 .await
1563 .expect("Failed to open blob");
1564 blob.resize(blob_size - 4)
1565 .await
1566 .expect("Failed to corrupt blob");
1567 blob.sync().await.expect("Failed to sync blob");
1568
1569 let mut journal = Journal::init(context.clone(), cfg.clone())
1571 .await
1572 .expect("Failed to re-initialize journal");
1573
1574 let mut items = Vec::<(u64, u64)>::new();
1576 {
1577 let stream = journal
1578 .replay(NZUsize!(1024))
1579 .await
1580 .expect("unable to setup replay");
1581 pin_mut!(stream);
1582 while let Some(result) = stream.next().await {
1583 match result {
1584 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1585 Err(err) => panic!("Failed to read item: {err}"),
1586 }
1587 }
1588 }
1589
1590 assert_eq!(items.len(), 3);
1592 assert_eq!(items[0].0, 1);
1593 assert_eq!(items[0].1, 1);
1594 assert_eq!(items[1].0, data_items[0].0);
1595 assert_eq!(items[1].1, data_items[0].1);
1596 assert_eq!(items[2].0, data_items[1].0);
1597 assert_eq!(items[2].1, data_items[1].1);
1598
1599 journal.append(2, 5).await.expect("Failed to append data");
1601 journal.sync(2).await.expect("Failed to sync blob");
1602
1603 let item = journal
1605 .get(2, 2)
1606 .await
1607 .expect("Failed to get item")
1608 .expect("Failed to get item");
1609 assert_eq!(item, 5);
1610
1611 journal.close().await.expect("Failed to close journal");
1613
1614 let (_, blob_size) = context
1616 .open(&cfg.partition, &2u64.to_be_bytes())
1617 .await
1618 .expect("Failed to open blob");
1619 assert_eq!(blob_size, 48);
1620
1621 let journal = Journal::init(context, cfg)
1623 .await
1624 .expect("Failed to re-initialize journal");
1625
1626 let mut items = Vec::<(u64, u64)>::new();
1628 {
1629 let stream = journal
1630 .replay(NZUsize!(1024))
1631 .await
1632 .expect("unable to setup replay");
1633 pin_mut!(stream);
1634 while let Some(result) = stream.next().await {
1635 match result {
1636 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1637 Err(err) => panic!("Failed to read item: {err}"),
1638 }
1639 }
1640 }
1641 journal.close().await.expect("Failed to close journal");
1642
1643 assert_eq!(items.len(), 4);
1645 assert_eq!(items[0].0, 1);
1646 assert_eq!(items[0].1, 1);
1647 assert_eq!(items[1].0, data_items[0].0);
1648 assert_eq!(items[1].1, data_items[0].1);
1649 assert_eq!(items[2].0, data_items[1].0);
1650 assert_eq!(items[2].1, data_items[1].1);
1651 assert_eq!(items[3].0, 2);
1652 assert_eq!(items[3].1, 5);
1653 });
1654 }
1655
1656 #[test_traced]
1657 fn test_journal_handling_extra_data() {
1658 let executor = deterministic::Runner::default();
1660
1661 executor.start(|context| async move {
1663 let cfg = Config {
1665 partition: "test_partition".into(),
1666 compression: None,
1667 codec_config: (),
1668 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1669 write_buffer: NZUsize!(1024),
1670 };
1671
1672 let mut journal = Journal::init(context.clone(), cfg.clone())
1674 .await
1675 .expect("Failed to initialize journal");
1676
1677 journal.append(1, 1).await.expect("Failed to append data");
1679
1680 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1682 for (index, data) in &data_items {
1683 journal
1684 .append(*index, *data)
1685 .await
1686 .expect("Failed to append data");
1687 journal.sync(*index).await.expect("Failed to sync blob");
1688 }
1689
1690 journal.close().await.expect("Failed to close journal");
1692
1693 let (blob, blob_size) = context
1695 .open(&cfg.partition, &2u64.to_be_bytes())
1696 .await
1697 .expect("Failed to open blob");
1698 blob.write_at(vec![0u8; 16], blob_size)
1699 .await
1700 .expect("Failed to add extra data");
1701 blob.sync().await.expect("Failed to sync blob");
1702
1703 let journal = Journal::init(context, cfg)
1705 .await
1706 .expect("Failed to re-initialize journal");
1707
1708 let mut items = Vec::<(u64, i32)>::new();
1710 let stream = journal
1711 .replay(NZUsize!(1024))
1712 .await
1713 .expect("unable to setup replay");
1714 pin_mut!(stream);
1715 while let Some(result) = stream.next().await {
1716 match result {
1717 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1718 Err(err) => panic!("Failed to read item: {err}"),
1719 }
1720 }
1721 });
1722 }
1723
1724 #[derive(Clone)]
1726 struct MockBlob {}
1727
1728 impl Blob for MockBlob {
1729 async fn read_at(
1730 &self,
1731 buf: impl Into<StableBuf> + Send,
1732 _offset: u64,
1733 ) -> Result<StableBuf, RError> {
1734 Ok(buf.into())
1735 }
1736
1737 async fn write_at(
1738 &self,
1739 _buf: impl Into<StableBuf> + Send,
1740 _offset: u64,
1741 ) -> Result<(), RError> {
1742 Ok(())
1743 }
1744
1745 async fn resize(&self, _len: u64) -> Result<(), RError> {
1746 Ok(())
1747 }
1748
1749 async fn sync(&self) -> Result<(), RError> {
1750 Ok(())
1751 }
1752 }
1753
1754 #[derive(Clone)]
1756 struct MockStorage {
1757 len: u64,
1758 }
1759
1760 impl Storage for MockStorage {
1761 type Blob = MockBlob;
1762
1763 async fn open(&self, _partition: &str, _name: &[u8]) -> Result<(MockBlob, u64), RError> {
1764 Ok((MockBlob {}, self.len))
1765 }
1766
1767 async fn remove(&self, _partition: &str, _name: Option<&[u8]>) -> Result<(), RError> {
1768 Ok(())
1769 }
1770
1771 async fn scan(&self, _partition: &str) -> Result<Vec<Vec<u8>>, RError> {
1772 Ok(vec![])
1773 }
1774 }
1775
1776 impl Metrics for MockStorage {
1777 fn with_label(&self, _: &str) -> Self {
1778 self.clone()
1779 }
1780
1781 fn label(&self) -> String {
1782 String::new()
1783 }
1784
1785 fn register<N: Into<String>, H: Into<String>>(&self, _: N, _: H, _: impl Metric) {}
1786
1787 fn encode(&self) -> String {
1788 String::new()
1789 }
1790 }
1791
1792 const INDEX_ALIGNMENT: u64 = 16;
1795
1796 #[test_traced]
1797 fn test_journal_large_offset() {
1798 let executor = deterministic::Runner::default();
1800 executor.start(|_| async move {
1801 let cfg = Config {
1803 partition: "partition".to_string(),
1804 compression: None,
1805 codec_config: (),
1806 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1807 write_buffer: NZUsize!(1024),
1808 };
1809 let context = MockStorage {
1810 len: u32::MAX as u64 * INDEX_ALIGNMENT, };
1812 let mut journal = Journal::init(context, cfg).await.unwrap();
1813
1814 let data = 1;
1816 let (result, _) = journal
1817 .append(1, data)
1818 .await
1819 .expect("Failed to append data");
1820 assert_eq!(result, u32::MAX);
1821 });
1822 }
1823
1824 #[test_traced]
1825 fn test_journal_offset_overflow() {
1826 let executor = deterministic::Runner::default();
1828 executor.start(|_| async move {
1829 let cfg = Config {
1831 partition: "partition".to_string(),
1832 compression: None,
1833 codec_config: (),
1834 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1835 write_buffer: NZUsize!(1024),
1836 };
1837 let context = MockStorage {
1838 len: u32::MAX as u64 * INDEX_ALIGNMENT + 1,
1839 };
1840 let mut journal = Journal::init(context, cfg).await.unwrap();
1841
1842 let data = 1;
1844 let result = journal.append(1, data).await;
1845 assert!(matches!(result, Err(Error::OffsetOverflow)));
1846 });
1847 }
1848
1849 #[test_traced]
1850 fn test_journal_rewind() {
1851 let executor = deterministic::Runner::default();
1853 executor.start(|context| async move {
1854 let cfg = Config {
1856 partition: "test_partition".to_string(),
1857 compression: None,
1858 codec_config: (),
1859 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1860 write_buffer: NZUsize!(1024),
1861 };
1862 let mut journal = Journal::init(context, cfg).await.unwrap();
1863
1864 let size = journal.size(1).await.unwrap();
1866 assert_eq!(size, 0);
1867
1868 journal.append(1, 42i32).await.unwrap();
1870
1871 let size = journal.size(1).await.unwrap();
1873 assert!(size > 0);
1874
1875 journal.append(1, 43i32).await.unwrap();
1877 let new_size = journal.size(1).await.unwrap();
1878 assert!(new_size > size);
1879
1880 let size = journal.size(2).await.unwrap();
1882 assert_eq!(size, 0);
1883
1884 journal.append(2, 44i32).await.unwrap();
1886
1887 let size = journal.size(2).await.unwrap();
1889 assert!(size > 0);
1890
1891 journal.rewind(1, 0).await.unwrap();
1893
1894 let size = journal.size(1).await.unwrap();
1896 assert_eq!(size, 0);
1897
1898 let size = journal.size(2).await.unwrap();
1900 assert_eq!(size, 0);
1901 });
1902 }
1903
1904 #[test_traced]
1905 fn test_journal_rewind_section() {
1906 let executor = deterministic::Runner::default();
1908 executor.start(|context| async move {
1909 let cfg = Config {
1911 partition: "test_partition".to_string(),
1912 compression: None,
1913 codec_config: (),
1914 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1915 write_buffer: NZUsize!(1024),
1916 };
1917 let mut journal = Journal::init(context, cfg).await.unwrap();
1918
1919 let size = journal.size(1).await.unwrap();
1921 assert_eq!(size, 0);
1922
1923 journal.append(1, 42i32).await.unwrap();
1925
1926 let size = journal.size(1).await.unwrap();
1928 assert!(size > 0);
1929
1930 journal.append(1, 43i32).await.unwrap();
1932 let new_size = journal.size(1).await.unwrap();
1933 assert!(new_size > size);
1934
1935 let size = journal.size(2).await.unwrap();
1937 assert_eq!(size, 0);
1938
1939 journal.append(2, 44i32).await.unwrap();
1941
1942 let size = journal.size(2).await.unwrap();
1944 assert!(size > 0);
1945
1946 journal.rewind_section(1, 0).await.unwrap();
1948
1949 let size = journal.size(1).await.unwrap();
1951 assert_eq!(size, 0);
1952
1953 let size = journal.size(2).await.unwrap();
1955 assert!(size > 0);
1956 });
1957 }
1958
1959 #[test_traced]
1961 fn test_journal_conformance() {
1962 let executor = deterministic::Runner::default();
1964
1965 executor.start(|context| async move {
1967 let cfg = Config {
1969 partition: "test_partition".into(),
1970 compression: None,
1971 codec_config: (),
1972 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1973 write_buffer: NZUsize!(1024),
1974 };
1975
1976 let mut journal = Journal::init(context.clone(), cfg.clone())
1978 .await
1979 .expect("Failed to initialize journal");
1980
1981 for i in 0..100 {
1983 journal.append(1, i).await.expect("Failed to append data");
1984 }
1985 journal.sync(1).await.expect("Failed to sync blob");
1986
1987 journal.close().await.expect("Failed to close journal");
1989
1990 let (blob, size) = context
1992 .open(&cfg.partition, &1u64.to_be_bytes())
1993 .await
1994 .expect("Failed to open blob");
1995 assert!(size > 0);
1996 let buf = blob
1997 .read_at(vec![0u8; size as usize], 0)
1998 .await
1999 .expect("Failed to read blob");
2000 let digest = hash(buf.as_ref());
2001 assert_eq!(
2002 hex(&digest),
2003 "ca3845fa7fabd4d2855ab72ed21226d1d6eb30cb895ea9ec5e5a14201f3f25d8",
2004 );
2005 });
2006 }
2007}