1use super::Error;
101use bytes::{BufMut, Bytes};
102use commonware_runtime::{Blob, Error as RError, Metrics, Storage};
103use commonware_utils::hex;
104use futures::stream::{self, Stream, StreamExt};
105use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
106use std::collections::{btree_map::Entry, BTreeMap};
107use tracing::{debug, trace, warn};
108
109#[derive(Clone)]
111pub struct Config {
112 pub partition: String,
115}
116
117const ITEM_ALIGNMENT: u64 = 16;
118
119fn compute_next_offset(mut offset: u64) -> Result<u32, Error> {
122 let overage = offset % ITEM_ALIGNMENT;
123 if overage != 0 {
124 offset += ITEM_ALIGNMENT - overage;
125 }
126 let offset = offset / ITEM_ALIGNMENT;
127 let aligned_offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
128 Ok(aligned_offset)
129}
130
131pub struct Journal<B: Blob, E: Storage<B> + Metrics> {
133 context: E,
134 cfg: Config,
135
136 oldest_allowed: Option<u64>,
137
138 blobs: BTreeMap<u64, B>,
139
140 tracked: Gauge,
141 synced: Counter,
142 pruned: Counter,
143}
144
145impl<B: Blob, E: Storage<B> + Metrics> Journal<B, E> {
146 pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
152 let mut blobs = BTreeMap::new();
154 let stored_blobs = match context.scan(&cfg.partition).await {
155 Ok(blobs) => blobs,
156 Err(RError::PartitionMissing(_)) => Vec::new(),
157 Err(err) => return Err(Error::Runtime(err)),
158 };
159 for name in stored_blobs {
160 let blob = context.open(&cfg.partition, &name).await?;
161 let hex_name = hex(&name);
162 let section = match name.try_into() {
163 Ok(section) => u64::from_be_bytes(section),
164 Err(_) => return Err(Error::InvalidBlobName(hex_name)),
165 };
166 debug!(section, blob = hex_name, "loaded section");
167 blobs.insert(section, blob);
168 }
169
170 let tracked = Gauge::default();
172 let synced = Counter::default();
173 let pruned = Counter::default();
174 context.register("tracked", "Number of blobs", tracked.clone());
175 context.register("synced", "Number of syncs", synced.clone());
176 context.register("pruned", "Number of blobs pruned", pruned.clone());
177 tracked.set(blobs.len() as i64);
178
179 Ok(Self {
181 context,
182 cfg,
183
184 oldest_allowed: None,
185
186 blobs,
187 tracked,
188 synced,
189 pruned,
190 })
191 }
192
193 fn prune_guard(&self, section: u64, inclusive: bool) -> Result<(), Error> {
195 if let Some(oldest_allowed) = self.oldest_allowed {
196 if section < oldest_allowed || (inclusive && section <= oldest_allowed) {
197 return Err(Error::AlreadyPrunedToSection(oldest_allowed));
198 }
199 }
200 Ok(())
201 }
202
203 async fn read(blob: &B, offset: u32) -> Result<(u32, u32, Bytes), Error> {
205 let offset = offset as u64 * ITEM_ALIGNMENT;
207 let mut size = [0u8; 4];
208 blob.read_at(&mut size, offset).await?;
209 let size = u32::from_be_bytes(size);
210 let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
211
212 let mut item = vec![0u8; size as usize];
214 blob.read_at(&mut item, offset).await?;
215 let offset = offset
216 .checked_add(size as u64)
217 .ok_or(Error::OffsetOverflow)?;
218
219 let mut stored_checksum = [0u8; 4];
221 blob.read_at(&mut stored_checksum, offset).await?;
222 let stored_checksum = u32::from_be_bytes(stored_checksum);
223 let checksum = crc32fast::hash(&item);
224 if checksum != stored_checksum {
225 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
226 }
227 let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
228
229 let aligned_offset = compute_next_offset(offset)?;
231
232 Ok((aligned_offset, size, Bytes::from(item)))
234 }
235
236 async fn read_prefix(blob: &B, offset: u32, prefix: u32) -> Result<(u32, u32, Bytes), Error> {
244 let offset = offset as u64 * ITEM_ALIGNMENT;
246 let mut buf = vec![0u8; 4 + prefix as usize];
247 blob.read_at(&mut buf, offset).await?;
248
249 let size = u32::from_be_bytes(buf[..4].try_into().unwrap());
251
252 let item_prefix = Bytes::from(buf[4..].to_vec());
257
258 let offset = offset
260 .checked_add(4)
261 .ok_or(Error::OffsetOverflow)?
262 .checked_add(size as u64)
263 .ok_or(Error::OffsetOverflow)?
264 .checked_add(4)
265 .ok_or(Error::OffsetOverflow)?;
266 let aligned_offset = compute_next_offset(offset)?;
267
268 Ok((aligned_offset, size, item_prefix))
270 }
271
272 async fn read_exact(blob: &B, offset: u32, exact: u32) -> Result<(u32, Bytes), Error> {
281 let offset = offset as u64 * ITEM_ALIGNMENT;
283 let mut buf = vec![0u8; 4 + exact as usize + 4];
284 blob.read_at(&mut buf, offset).await?;
285
286 let size = u32::from_be_bytes(buf[..4].try_into().unwrap());
288 if size != exact {
289 return Err(Error::UnexpectedSize(size, exact));
290 }
291
292 let item = Bytes::from(buf[4..4 + exact as usize].to_vec());
294
295 let stored_checksum = u32::from_be_bytes(buf[4 + exact as usize..].try_into().unwrap());
297 let checksum = crc32fast::hash(&item);
298 if checksum != stored_checksum {
299 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
300 }
301
302 let offset = offset
304 .checked_add(4)
305 .ok_or(Error::OffsetOverflow)?
306 .checked_add(exact as u64)
307 .ok_or(Error::OffsetOverflow)?
308 .checked_add(4)
309 .ok_or(Error::OffsetOverflow)?;
310 let aligned_offset = compute_next_offset(offset)?;
311
312 Ok((aligned_offset, item))
314 }
315
316 pub async fn replay(
343 &mut self,
344 concurrency: usize,
345 prefix: Option<u32>,
346 ) -> Result<impl Stream<Item = Result<(u64, u32, u32, Bytes), Error>> + '_, Error> {
347 let mut blobs = Vec::with_capacity(self.blobs.len());
349 for (section, blob) in self.blobs.iter() {
350 let len = blob.len().await?;
351 let aligned_len = compute_next_offset(len)?;
352 blobs.push((*section, blob, aligned_len));
353 }
354
355 Ok(stream::iter(blobs)
358 .map(move |(section, blob, len)| async move {
359 stream::unfold(
360 (section, blob, 0u32),
361 move |(section, blob, offset)| async move {
362 if offset == len {
364 return None;
365 }
366
367 let mut read = match prefix {
369 Some(prefix) => Self::read_prefix(blob, offset, prefix).await,
370 None => Self::read(blob, offset).await,
371 };
372
373 if let Ok((next_offset, _, _)) = read {
375 if next_offset > len {
376 read = Err(Error::Runtime(RError::BlobInsufficientLength));
377 }
378 };
379
380 match read {
382 Ok((next_offset, item_size, item)) => {
383 trace!(blob = section, cursor = offset, len, "replayed item");
384 Some((
385 Ok((section, offset, item_size, item)),
386 (section, blob, next_offset),
387 ))
388 }
389 Err(Error::ChecksumMismatch(expected, found)) => {
390 warn!(
392 blob = section,
393 cursor = offset,
394 expected,
395 found,
396 "corruption detected"
397 );
398 Some((
399 Err(Error::ChecksumMismatch(expected, found)),
400 (section, blob, len),
401 ))
402 }
403 Err(Error::Runtime(RError::BlobInsufficientLength)) => {
404 warn!(
408 blob = section,
409 new_size = offset,
410 old_size = len,
411 "trailing bytes detected: truncating"
412 );
413 blob.truncate(offset as u64 * ITEM_ALIGNMENT).await.ok()?;
414 blob.sync().await.ok()?;
415 None
416 }
417 Err(err) => Some((Err(err), (section, blob, len))),
418 }
419 },
420 )
421 })
422 .buffer_unordered(concurrency)
423 .flatten())
424 }
425
426 pub async fn append(&mut self, section: u64, item: Bytes) -> Result<u32, Error> {
436 self.prune_guard(section, false)?;
438
439 let item_len = item.len();
441 let len = 4 + item_len + 4;
442 let item_len = match item_len.try_into() {
443 Ok(len) => len,
444 Err(_) => return Err(Error::ItemTooLarge(item_len)),
445 };
446
447 let blob = match self.blobs.entry(section) {
449 Entry::Occupied(entry) => entry.into_mut(),
450 Entry::Vacant(entry) => {
451 let name = section.to_be_bytes();
452 let blob = self.context.open(&self.cfg.partition, &name).await?;
453 self.tracked.inc();
454 entry.insert(blob)
455 }
456 };
457
458 let mut buf = Vec::with_capacity(len);
460 buf.put_u32(item_len);
461 let checksum = crc32fast::hash(&item);
462 buf.put(item);
463 buf.put_u32(checksum);
464
465 let cursor = blob.len().await?;
467 let offset = compute_next_offset(cursor)?;
468 blob.write_at(&buf, offset as u64 * ITEM_ALIGNMENT).await?;
469 trace!(blob = section, previous_len = len, offset, "appended item");
470 Ok(offset)
471 }
472
473 pub async fn get_prefix(
478 &self,
479 section: u64,
480 offset: u32,
481 prefix: u32,
482 ) -> Result<Option<Bytes>, Error> {
483 self.prune_guard(section, false)?;
484 let blob = match self.blobs.get(§ion) {
485 Some(blob) => blob,
486 None => return Ok(None),
487 };
488 let (_, _, item) = Self::read_prefix(blob, offset, prefix).await?;
489 Ok(Some(item))
490 }
491
492 pub async fn get(
498 &self,
499 section: u64,
500 offset: u32,
501 exact: Option<u32>,
502 ) -> Result<Option<Bytes>, Error> {
503 self.prune_guard(section, false)?;
504 let blob = match self.blobs.get(§ion) {
505 Some(blob) => blob,
506 None => return Ok(None),
507 };
508
509 if let Some(exact) = exact {
511 let (_, item) = Self::read_exact(blob, offset, exact).await?;
512 return Ok(Some(item));
513 }
514
515 let (_, _, item) = Self::read(blob, offset).await?;
517 Ok(Some(item))
518 }
519
520 pub async fn sync(&self, section: u64) -> Result<(), Error> {
524 self.prune_guard(section, false)?;
525 let blob = match self.blobs.get(§ion) {
526 Some(blob) => blob,
527 None => return Ok(()),
528 };
529 self.synced.inc();
530 blob.sync().await.map_err(Error::Runtime)
531 }
532
533 pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
535 self.prune_guard(min, true)?;
537
538 while let Some((§ion, _)) = self.blobs.first_key_value() {
540 if section >= min {
542 break;
543 }
544
545 let blob = self.blobs.remove(§ion).unwrap();
547 blob.close().await?;
548
549 self.context
551 .remove(&self.cfg.partition, Some(§ion.to_be_bytes()))
552 .await?;
553 debug!(blob = section, "pruned blob");
554 self.tracked.dec();
555 self.pruned.inc();
556 }
557
558 self.oldest_allowed = Some(min);
560 Ok(())
561 }
562
563 pub async fn close(self) -> Result<(), Error> {
565 for (section, blob) in self.blobs.into_iter() {
566 blob.close().await?;
567 debug!(blob = section, "closed blob");
568 }
569 Ok(())
570 }
571}
572
573#[cfg(test)]
574mod tests {
575 use super::*;
576 use bytes::{BufMut, Bytes};
577 use commonware_macros::test_traced;
578 use commonware_runtime::{deterministic::Executor, Blob, Error as RError, Runner, Storage};
579 use futures::{pin_mut, StreamExt};
580 use prometheus_client::registry::Metric;
581
582 #[test_traced]
583 fn test_journal_append_and_read() {
584 let (executor, context, _) = Executor::default();
586
587 executor.start(async move {
589 let cfg = Config {
591 partition: "test_partition".into(),
592 };
593 let index = 1u64;
594 let data = Bytes::from("Test data");
595 let mut journal = Journal::init(context.clone(), cfg.clone())
596 .await
597 .expect("Failed to initialize journal");
598
599 journal
601 .append(index, data.clone())
602 .await
603 .expect("Failed to append data");
604
605 let buffer = context.encode();
607 assert!(buffer.contains("tracked 1"));
608
609 journal.close().await.expect("Failed to close journal");
611
612 let cfg = Config {
614 partition: "test_partition".into(),
615 };
616 let mut journal = Journal::init(context.clone(), cfg.clone())
617 .await
618 .expect("Failed to re-initialize journal");
619
620 let mut items = Vec::new();
622 let stream = journal
623 .replay(1, None)
624 .await
625 .expect("unable to setup replay");
626 pin_mut!(stream);
627 while let Some(result) = stream.next().await {
628 match result {
629 Ok((blob_index, _, full_len, item)) => {
630 assert_eq!(full_len as usize, item.len());
631 items.push((blob_index, item))
632 }
633 Err(err) => panic!("Failed to read item: {}", err),
634 }
635 }
636
637 assert_eq!(items.len(), 1);
639 assert_eq!(items[0].0, index);
640 assert_eq!(items[0].1, data);
641
642 let buffer = context.encode();
644 assert!(buffer.contains("tracked 1"));
645 });
646 }
647
648 #[test_traced]
649 fn test_journal_multiple_appends_and_reads() {
650 let (executor, context, _) = Executor::default();
652
653 executor.start(async move {
655 let cfg = Config {
657 partition: "test_partition".into(),
658 };
659
660 let mut journal = Journal::init(context.clone(), cfg.clone())
662 .await
663 .expect("Failed to initialize journal");
664
665 let data_items = vec![
667 (1u64, Bytes::from("Data for blob 1")),
668 (1u64, Bytes::from("Data for blob 1, second item")),
669 (2u64, Bytes::from("Data for blob 2")),
670 (3u64, Bytes::from("Data for blob 3")),
671 ];
672 for (index, data) in &data_items {
673 journal
674 .append(*index, data.clone())
675 .await
676 .expect("Failed to append data");
677 journal.sync(*index).await.expect("Failed to sync blob");
678 }
679
680 let buffer = context.encode();
682 assert!(buffer.contains("tracked 3"));
683 assert!(buffer.contains("synced_total 4"));
684
685 journal.close().await.expect("Failed to close journal");
687
688 let mut journal = Journal::init(context, cfg)
690 .await
691 .expect("Failed to re-initialize journal");
692
693 let mut items = Vec::new();
695 {
696 let stream = journal
697 .replay(2, None)
698 .await
699 .expect("unable to setup replay");
700 pin_mut!(stream);
701 while let Some(result) = stream.next().await {
702 match result {
703 Ok((blob_index, _, full_len, item)) => {
704 assert_eq!(full_len as usize, item.len());
705 items.push((blob_index, item))
706 }
707 Err(err) => panic!("Failed to read item: {}", err),
708 }
709 }
710 }
711
712 assert_eq!(items.len(), data_items.len());
714 for ((expected_index, expected_data), (actual_index, actual_data)) in
715 data_items.iter().zip(items.iter())
716 {
717 assert_eq!(actual_index, expected_index);
718 assert_eq!(actual_data, expected_data);
719 }
720
721 {
723 let stream = journal
724 .replay(2, Some(4))
725 .await
726 .expect("unable to setup replay");
727 pin_mut!(stream);
728 while let Some(result) = stream.next().await {
729 match result {
730 Ok((_, _, full_len, item)) => {
731 assert_eq!(item, Bytes::from("Data"));
732 assert!(full_len as usize > item.len());
733 }
734 Err(err) => panic!("Failed to read item: {}", err),
735 }
736 }
737 }
738 });
739 }
740
741 #[test_traced]
742 fn test_journal_prune_blobs() {
743 let (executor, context, _) = Executor::default();
745
746 executor.start(async move {
748 let cfg = Config {
750 partition: "test_partition".into(),
751 };
752
753 let mut journal = Journal::init(context.clone(), cfg.clone())
755 .await
756 .expect("Failed to initialize journal");
757
758 for index in 1u64..=5u64 {
760 let data = Bytes::from(format!("Data for blob {}", index));
761 journal
762 .append(index, data)
763 .await
764 .expect("Failed to append data");
765 journal.sync(index).await.expect("Failed to sync blob");
766 }
767
768 let data = Bytes::from("Data for blob 2, second item");
770 journal
771 .append(2u64, data)
772 .await
773 .expect("Failed to append data");
774 journal.sync(2u64).await.expect("Failed to sync blob");
775
776 journal.prune(3).await.expect("Failed to prune blobs");
778
779 let result = journal.prune(2).await;
781 assert!(matches!(result, Err(Error::AlreadyPrunedToSection(3))));
782
783 let result = journal.prune(3).await;
785 assert!(matches!(result, Err(Error::AlreadyPrunedToSection(3))));
786
787 let buffer = context.encode();
789 assert!(buffer.contains("pruned_total 2"));
790
791 journal.close().await.expect("Failed to close journal");
793
794 let mut journal = Journal::init(context.clone(), cfg.clone())
796 .await
797 .expect("Failed to re-initialize journal");
798
799 let mut items = Vec::new();
801 {
802 let stream = journal
803 .replay(1, None)
804 .await
805 .expect("unable to setup replay");
806 pin_mut!(stream);
807 while let Some(result) = stream.next().await {
808 match result {
809 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
810 Err(err) => panic!("Failed to read item: {}", err),
811 }
812 }
813 }
814
815 assert_eq!(items.len(), 3);
817 let expected_indices = [3u64, 4u64, 5u64];
818 for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
819 assert_eq!(item.0, *expected_index);
820 }
821
822 journal.prune(6).await.expect("Failed to prune blobs");
824
825 journal.close().await.expect("Failed to close journal");
827
828 assert!(context
833 .scan(&cfg.partition)
834 .await
835 .expect("Failed to list blobs")
836 .is_empty());
837 });
838 }
839
840 #[test_traced]
841 fn test_journal_with_invalid_blob_name() {
842 let (executor, context, _) = Executor::default();
844
845 executor.start(async move {
847 let cfg = Config {
849 partition: "test_partition".into(),
850 };
851
852 let invalid_blob_name = b"invalid"; let blob = context
855 .open(&cfg.partition, invalid_blob_name)
856 .await
857 .expect("Failed to create blob with invalid name");
858 blob.close().await.expect("Failed to close blob");
859
860 let result = Journal::init(context, cfg).await;
862
863 assert!(matches!(result, Err(Error::InvalidBlobName(_))));
865 });
866 }
867
868 fn journal_read_size_missing(exact: Option<u32>) {
869 let (executor, context, _) = Executor::default();
871
872 executor.start(async move {
874 let cfg = Config {
876 partition: "test_partition".into(),
877 };
878
879 let section = 1u64;
881 let blob_name = section.to_be_bytes();
882 let blob = context
883 .open(&cfg.partition, &blob_name)
884 .await
885 .expect("Failed to create blob");
886
887 let incomplete_data = vec![0x00, 0x01]; blob.write_at(&incomplete_data, 0)
890 .await
891 .expect("Failed to write incomplete data");
892 blob.close().await.expect("Failed to close blob");
893
894 let mut journal = Journal::init(context, cfg)
896 .await
897 .expect("Failed to initialize journal");
898
899 let stream = journal
901 .replay(1, exact)
902 .await
903 .expect("unable to setup replay");
904 pin_mut!(stream);
905 let mut items = Vec::new();
906 while let Some(result) = stream.next().await {
907 match result {
908 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
909 Err(err) => panic!("Failed to read item: {}", err),
910 }
911 }
912 assert!(items.is_empty());
913 });
914 }
915
916 #[test_traced]
917 fn test_journal_read_size_missing_no_exact() {
918 journal_read_size_missing(None);
919 }
920
921 #[test_traced]
922 fn test_journal_read_size_missing_with_exact() {
923 journal_read_size_missing(Some(1));
924 }
925
926 fn journal_read_item_missing(exact: Option<u32>) {
927 let (executor, context, _) = Executor::default();
929
930 executor.start(async move {
932 let cfg = Config {
934 partition: "test_partition".into(),
935 };
936
937 let section = 1u64;
939 let blob_name = section.to_be_bytes();
940 let blob = context
941 .open(&cfg.partition, &blob_name)
942 .await
943 .expect("Failed to create blob");
944
945 let item_size: u32 = 10; let mut buf = Vec::new();
948 buf.put_u32(item_size);
949 let data = [2u8; 5];
950 buf.put_slice(&data);
951 blob.write_at(&buf, 0)
952 .await
953 .expect("Failed to write item size");
954 blob.close().await.expect("Failed to close blob");
955
956 let mut journal = Journal::init(context, cfg)
958 .await
959 .expect("Failed to initialize journal");
960
961 let stream = journal
963 .replay(1, exact)
964 .await
965 .expect("unable to setup replay");
966
967 pin_mut!(stream);
968 let mut items = Vec::new();
969 while let Some(result) = stream.next().await {
970 match result {
971 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
972 Err(err) => panic!("Failed to read item: {}", err),
973 }
974 }
975 assert!(items.is_empty());
976 });
977 }
978
979 #[test_traced]
980 fn test_journal_read_item_missing_no_exact() {
981 journal_read_item_missing(None);
982 }
983
984 #[test_traced]
985 fn test_journal_read_item_missing_with_exact() {
986 journal_read_item_missing(Some(1));
987 }
988
989 #[test_traced]
990 fn test_journal_read_checksum_missing() {
991 let (executor, context, _) = Executor::default();
993
994 executor.start(async move {
996 let cfg = Config {
998 partition: "test_partition".into(),
999 };
1000
1001 let section = 1u64;
1003 let blob_name = section.to_be_bytes();
1004 let blob = context
1005 .open(&cfg.partition, &blob_name)
1006 .await
1007 .expect("Failed to create blob");
1008
1009 let item_data = b"Test data";
1011 let item_size = item_data.len() as u32;
1012
1013 let mut offset = 0;
1015 blob.write_at(&item_size.to_be_bytes(), offset)
1016 .await
1017 .expect("Failed to write item size");
1018 offset += 4;
1019
1020 blob.write_at(item_data, offset)
1022 .await
1023 .expect("Failed to write item data");
1024 blob.close().await.expect("Failed to close blob");
1027
1028 let mut journal = Journal::init(context, cfg)
1030 .await
1031 .expect("Failed to initialize journal");
1032
1033 let stream = journal
1037 .replay(1, None)
1038 .await
1039 .expect("unable to setup replay");
1040 pin_mut!(stream);
1041 let mut items = Vec::new();
1042 while let Some(result) = stream.next().await {
1043 match result {
1044 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1045 Err(err) => panic!("Failed to read item: {}", err),
1046 }
1047 }
1048 assert!(items.is_empty());
1049 });
1050 }
1051
1052 #[test_traced]
1053 fn test_journal_read_checksum_mismatch() {
1054 let (executor, context, _) = Executor::default();
1056
1057 executor.start(async move {
1059 let cfg = Config {
1061 partition: "test_partition".into(),
1062 };
1063
1064 let section = 1u64;
1066 let blob_name = section.to_be_bytes();
1067 let blob = context
1068 .open(&cfg.partition, &blob_name)
1069 .await
1070 .expect("Failed to create blob");
1071
1072 let item_data = b"Test data";
1074 let item_size = item_data.len() as u32;
1075 let incorrect_checksum: u32 = 0xDEADBEEF;
1076
1077 let mut offset = 0;
1079 blob.write_at(&item_size.to_be_bytes(), offset)
1080 .await
1081 .expect("Failed to write item size");
1082 offset += 4;
1083
1084 blob.write_at(item_data, offset)
1086 .await
1087 .expect("Failed to write item data");
1088 offset += item_data.len() as u64;
1089
1090 blob.write_at(&incorrect_checksum.to_be_bytes(), offset)
1092 .await
1093 .expect("Failed to write incorrect checksum");
1094
1095 blob.close().await.expect("Failed to close blob");
1096
1097 let mut journal = Journal::init(context, cfg)
1099 .await
1100 .expect("Failed to initialize journal");
1101
1102 let stream = journal
1104 .replay(1, None)
1105 .await
1106 .expect("unable to setup replay");
1107 pin_mut!(stream);
1108 let mut items = Vec::new();
1109 let mut got_checksum_error = false;
1110 while let Some(result) = stream.next().await {
1111 match result {
1112 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1113 Err(err) => {
1114 assert!(matches!(err, Error::ChecksumMismatch(_, _)));
1115 got_checksum_error = true;
1116 }
1119 }
1120 }
1121 assert!(got_checksum_error, "expected checksum mismatch error");
1122 });
1123 }
1124
1125 #[test_traced]
1126 fn test_journal_handling_truncated_data() {
1127 let (executor, context, _) = Executor::default();
1129
1130 executor.start(async move {
1132 let cfg = Config {
1134 partition: "test_partition".into(),
1135 };
1136
1137 let mut journal = Journal::init(context.clone(), cfg.clone())
1139 .await
1140 .expect("Failed to initialize journal");
1141
1142 journal
1144 .append(1, Bytes::from("Valid data"))
1145 .await
1146 .expect("Failed to append data");
1147
1148 let data_items = vec![
1150 (2u64, Bytes::from("Valid data")),
1151 (2u64, Bytes::from("Valid data, second item")),
1152 (2u64, Bytes::from("Valid data, third item")),
1153 ];
1154 for (index, data) in &data_items {
1155 journal
1156 .append(*index, data.clone())
1157 .await
1158 .expect("Failed to append data");
1159 journal.sync(*index).await.expect("Failed to sync blob");
1160 }
1161
1162 journal.close().await.expect("Failed to close journal");
1164
1165 let blob = context
1167 .open(&cfg.partition, &2u64.to_be_bytes())
1168 .await
1169 .expect("Failed to open blob");
1170 let blob_len = blob.len().await.expect("Failed to get blob length");
1171 blob.truncate(blob_len - 4)
1172 .await
1173 .expect("Failed to corrupt blob");
1174 blob.close().await.expect("Failed to close blob");
1175
1176 let mut journal = Journal::init(context, cfg)
1178 .await
1179 .expect("Failed to re-initialize journal");
1180
1181 let mut items = Vec::new();
1183 let stream = journal
1184 .replay(1, None)
1185 .await
1186 .expect("unable to setup replay");
1187 pin_mut!(stream);
1188 while let Some(result) = stream.next().await {
1189 match result {
1190 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1191 Err(err) => panic!("Failed to read item: {}", err),
1192 }
1193 }
1194
1195 assert_eq!(items.len(), 3);
1197 assert_eq!(items[0].0, 1);
1198 assert_eq!(items[0].1, Bytes::from("Valid data"));
1199 assert_eq!(items[1].0, data_items[0].0);
1200 assert_eq!(items[1].1, data_items[0].1);
1201 assert_eq!(items[2].0, data_items[1].0);
1202 assert_eq!(items[2].1, data_items[1].1);
1203 });
1204 }
1205
1206 #[derive(Clone)]
1208 struct MockBlob {
1209 len: u64,
1210 }
1211
1212 impl Blob for MockBlob {
1213 async fn len(&self) -> Result<u64, commonware_runtime::Error> {
1214 Ok(self.len)
1216 }
1217
1218 async fn read_at(&self, _buf: &mut [u8], _offset: u64) -> Result<(), RError> {
1219 Ok(())
1220 }
1221
1222 async fn write_at(&self, _buf: &[u8], _offset: u64) -> Result<(), RError> {
1223 Ok(())
1224 }
1225
1226 async fn truncate(&self, _len: u64) -> Result<(), RError> {
1227 Ok(())
1228 }
1229
1230 async fn sync(&self) -> Result<(), RError> {
1231 Ok(())
1232 }
1233
1234 async fn close(self) -> Result<(), RError> {
1235 Ok(())
1236 }
1237 }
1238
1239 #[derive(Clone)]
1241 struct MockStorage {
1242 len: u64,
1243 }
1244
1245 impl Storage<MockBlob> for MockStorage {
1246 async fn open(&self, _partition: &str, _name: &[u8]) -> Result<MockBlob, RError> {
1247 Ok(MockBlob { len: self.len })
1248 }
1249
1250 async fn remove(&self, _partition: &str, _name: Option<&[u8]>) -> Result<(), RError> {
1251 Ok(())
1252 }
1253
1254 async fn scan(&self, _partition: &str) -> Result<Vec<Vec<u8>>, RError> {
1255 Ok(vec![])
1256 }
1257 }
1258
1259 impl Metrics for MockStorage {
1260 fn with_label(&self, _: &str) -> Self {
1261 self.clone()
1262 }
1263
1264 fn label(&self) -> String {
1265 String::new()
1266 }
1267
1268 fn register<N: Into<String>, H: Into<String>>(&self, _: N, _: H, _: impl Metric) {}
1269
1270 fn encode(&self) -> String {
1271 String::new()
1272 }
1273 }
1274
1275 const INDEX_ALIGNMENT: u64 = 16;
1278
1279 #[test_traced]
1280 fn test_journal_large_offset() {
1281 let (executor, _, _) = Executor::default();
1283 executor.start(async move {
1284 let cfg = Config {
1286 partition: "partition".to_string(),
1287 };
1288 let context = MockStorage {
1289 len: u32::MAX as u64 * INDEX_ALIGNMENT, };
1291 let mut journal = Journal::init(context, cfg).await.unwrap();
1292
1293 let data = Bytes::from("Test data");
1295 let result = journal
1296 .append(1, data)
1297 .await
1298 .expect("Failed to append data");
1299 assert_eq!(result, u32::MAX);
1300 });
1301 }
1302
1303 #[test_traced]
1304 fn test_journal_offset_overflow() {
1305 let (executor, _, _) = Executor::default();
1307 executor.start(async move {
1308 let cfg = Config {
1310 partition: "partition".to_string(),
1311 };
1312 let context = MockStorage {
1313 len: u32::MAX as u64 * INDEX_ALIGNMENT + 1,
1314 };
1315 let mut journal = Journal::init(context, cfg).await.unwrap();
1316
1317 let data = Bytes::from("Test data");
1319 let result = journal.append(1, data).await;
1320 assert!(matches!(result, Err(Error::OffsetOverflow)));
1321 });
1322 }
1323}