1use super::Error;
104use bytes::{BufMut, Bytes};
105use commonware_runtime::{Blob, Error as RError, Storage};
106use commonware_utils::hex;
107use futures::stream::{self, Stream, StreamExt};
108use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
109use prometheus_client::registry::Registry;
110use std::collections::{btree_map::Entry, BTreeMap};
111use std::sync::{Arc, Mutex};
112use tracing::{debug, trace, warn};
113
114#[derive(Clone)]
116pub struct Config {
117 pub registry: Arc<Mutex<Registry>>,
119
120 pub partition: String,
123}
124
125const ITEM_ALIGNMENT: u64 = 16;
126
127pub struct Journal<B: Blob, E: Storage<B>> {
129 runtime: E,
130 cfg: Config,
131
132 oldest_allowed: Option<u64>,
133
134 blobs: BTreeMap<u64, B>,
135
136 tracked: Gauge,
137 synced: Counter,
138 pruned: Counter,
139}
140
141fn compute_next_offset(mut offset: u64) -> Result<u32, Error> {
144 let overage = offset % ITEM_ALIGNMENT;
145 if overage != 0 {
146 offset += ITEM_ALIGNMENT - overage;
147 }
148 let offset = offset / ITEM_ALIGNMENT;
149 let aligned_offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
150 Ok(aligned_offset)
151}
152
153impl<B: Blob, E: Storage<B>> Journal<B, E> {
154 pub async fn init(runtime: E, cfg: Config) -> Result<Self, Error> {
160 let mut blobs = BTreeMap::new();
162 let stored_blobs = match runtime.scan(&cfg.partition).await {
163 Ok(blobs) => blobs,
164 Err(RError::PartitionMissing(_)) => Vec::new(),
165 Err(err) => return Err(Error::Runtime(err)),
166 };
167 for name in stored_blobs {
168 let blob = runtime.open(&cfg.partition, &name).await?;
169 let hex_name = hex(&name);
170 let section = match name.try_into() {
171 Ok(section) => u64::from_be_bytes(section),
172 Err(_) => return Err(Error::InvalidBlobName(hex_name)),
173 };
174 debug!(section, blob = hex_name, "loaded section");
175 blobs.insert(section, blob);
176 }
177
178 let tracked = Gauge::default();
180 let synced = Counter::default();
181 let pruned = Counter::default();
182 {
183 let mut registry = cfg.registry.lock().unwrap();
184 registry.register("tracked", "Number of blobs", tracked.clone());
185 registry.register("synced", "Number of syncs", synced.clone());
186 registry.register("pruned", "Number of blobs pruned", pruned.clone());
187 }
188 tracked.set(blobs.len() as i64);
189
190 Ok(Self {
192 runtime,
193 cfg,
194
195 oldest_allowed: None,
196
197 blobs,
198 tracked,
199 synced,
200 pruned,
201 })
202 }
203
204 fn prune_guard(&self, section: u64, inclusive: bool) -> Result<(), Error> {
206 if let Some(oldest_allowed) = self.oldest_allowed {
207 if section < oldest_allowed || (inclusive && section <= oldest_allowed) {
208 return Err(Error::AlreadyPrunedToSection(oldest_allowed));
209 }
210 }
211 Ok(())
212 }
213
214 async fn read(blob: &B, offset: u32) -> Result<(u32, u32, Bytes), Error> {
216 let offset = offset as u64 * ITEM_ALIGNMENT;
218 let mut size = [0u8; 4];
219 blob.read_at(&mut size, offset).await?;
220 let size = u32::from_be_bytes(size);
221 let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
222
223 let mut item = vec![0u8; size as usize];
225 blob.read_at(&mut item, offset).await?;
226 let offset = offset
227 .checked_add(size as u64)
228 .ok_or(Error::OffsetOverflow)?;
229
230 let mut stored_checksum = [0u8; 4];
232 blob.read_at(&mut stored_checksum, offset).await?;
233 let stored_checksum = u32::from_be_bytes(stored_checksum);
234 let checksum = crc32fast::hash(&item);
235 if checksum != stored_checksum {
236 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
237 }
238 let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
239
240 let aligned_offset = compute_next_offset(offset)?;
242
243 Ok((aligned_offset, size, Bytes::from(item)))
245 }
246
247 async fn read_prefix(blob: &B, offset: u32, prefix: u32) -> Result<(u32, u32, Bytes), Error> {
255 let offset = offset as u64 * ITEM_ALIGNMENT;
257 let mut buf = vec![0u8; 4 + prefix as usize];
258 blob.read_at(&mut buf, offset).await?;
259
260 let size = u32::from_be_bytes(buf[..4].try_into().unwrap());
262
263 let item_prefix = Bytes::from(buf[4..].to_vec());
268
269 let offset = offset
271 .checked_add(4)
272 .ok_or(Error::OffsetOverflow)?
273 .checked_add(size as u64)
274 .ok_or(Error::OffsetOverflow)?
275 .checked_add(4)
276 .ok_or(Error::OffsetOverflow)?;
277 let aligned_offset = compute_next_offset(offset)?;
278
279 Ok((aligned_offset, size, item_prefix))
281 }
282
283 async fn read_exact(blob: &B, offset: u32, exact: u32) -> Result<(u32, Bytes), Error> {
292 let offset = offset as u64 * ITEM_ALIGNMENT;
294 let mut buf = vec![0u8; 4 + exact as usize + 4];
295 blob.read_at(&mut buf, offset).await?;
296
297 let size = u32::from_be_bytes(buf[..4].try_into().unwrap());
299 if size != exact {
300 return Err(Error::UnexpectedSize(size, exact));
301 }
302
303 let item = Bytes::from(buf[4..4 + exact as usize].to_vec());
305
306 let stored_checksum = u32::from_be_bytes(buf[4 + exact as usize..].try_into().unwrap());
308 let checksum = crc32fast::hash(&item);
309 if checksum != stored_checksum {
310 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
311 }
312
313 let offset = offset
315 .checked_add(4)
316 .ok_or(Error::OffsetOverflow)?
317 .checked_add(exact as u64)
318 .ok_or(Error::OffsetOverflow)?
319 .checked_add(4)
320 .ok_or(Error::OffsetOverflow)?;
321 let aligned_offset = compute_next_offset(offset)?;
322
323 Ok((aligned_offset, item))
325 }
326
327 pub async fn replay(
354 &mut self,
355 concurrency: usize,
356 prefix: Option<u32>,
357 ) -> Result<impl Stream<Item = Result<(u64, u32, u32, Bytes), Error>> + '_, Error> {
358 let mut blobs = Vec::with_capacity(self.blobs.len());
360 for (section, blob) in self.blobs.iter() {
361 let len = blob.len().await?;
362 let aligned_len = compute_next_offset(len)?;
363 blobs.push((*section, blob, aligned_len));
364 }
365
366 Ok(stream::iter(blobs)
369 .map(move |(section, blob, len)| async move {
370 stream::unfold(
371 (section, blob, 0u32),
372 move |(section, blob, offset)| async move {
373 if offset == len {
375 return None;
376 }
377
378 let mut read = match prefix {
380 Some(prefix) => Self::read_prefix(blob, offset, prefix).await,
381 None => Self::read(blob, offset).await,
382 };
383
384 if let Ok((next_offset, _, _)) = read {
386 if next_offset > len {
387 read = Err(Error::Runtime(RError::BlobInsufficientLength));
388 }
389 };
390
391 match read {
393 Ok((next_offset, item_size, item)) => {
394 trace!(blob = section, cursor = offset, len, "replayed item");
395 Some((
396 Ok((section, offset, item_size, item)),
397 (section, blob, next_offset),
398 ))
399 }
400 Err(Error::ChecksumMismatch(expected, found)) => {
401 warn!(
403 blob = section,
404 cursor = offset,
405 expected,
406 found,
407 "corruption detected"
408 );
409 Some((
410 Err(Error::ChecksumMismatch(expected, found)),
411 (section, blob, len),
412 ))
413 }
414 Err(Error::Runtime(RError::BlobInsufficientLength)) => {
415 warn!(
419 blob = section,
420 new_size = offset,
421 old_size = len,
422 "trailing bytes detected: truncating"
423 );
424 blob.truncate(offset as u64 * ITEM_ALIGNMENT).await.ok()?;
425 blob.sync().await.ok()?;
426 None
427 }
428 Err(err) => Some((Err(err), (section, blob, len))),
429 }
430 },
431 )
432 })
433 .buffer_unordered(concurrency)
434 .flatten())
435 }
436
437 pub async fn append(&mut self, section: u64, item: Bytes) -> Result<u32, Error> {
447 self.prune_guard(section, false)?;
449
450 let item_len = item.len();
452 let len = 4 + item_len + 4;
453 let item_len = match item_len.try_into() {
454 Ok(len) => len,
455 Err(_) => return Err(Error::ItemTooLarge(item_len)),
456 };
457
458 let blob = match self.blobs.entry(section) {
460 Entry::Occupied(entry) => entry.into_mut(),
461 Entry::Vacant(entry) => {
462 let name = section.to_be_bytes();
463 let blob = self.runtime.open(&self.cfg.partition, &name).await?;
464 self.tracked.inc();
465 entry.insert(blob)
466 }
467 };
468
469 let mut buf = Vec::with_capacity(len);
471 buf.put_u32(item_len);
472 let checksum = crc32fast::hash(&item);
473 buf.put(item);
474 buf.put_u32(checksum);
475
476 let cursor = blob.len().await?;
478 let offset = compute_next_offset(cursor)?;
479 blob.write_at(&buf, offset as u64 * ITEM_ALIGNMENT).await?;
480 trace!(blob = section, previous_len = len, offset, "appended item");
481 Ok(offset)
482 }
483
484 pub async fn get_prefix(
489 &self,
490 section: u64,
491 offset: u32,
492 prefix: u32,
493 ) -> Result<Option<Bytes>, Error> {
494 self.prune_guard(section, false)?;
495 let blob = match self.blobs.get(§ion) {
496 Some(blob) => blob,
497 None => return Ok(None),
498 };
499 let (_, _, item) = Self::read_prefix(blob, offset, prefix).await?;
500 Ok(Some(item))
501 }
502
503 pub async fn get(
509 &self,
510 section: u64,
511 offset: u32,
512 exact: Option<u32>,
513 ) -> Result<Option<Bytes>, Error> {
514 self.prune_guard(section, false)?;
515 let blob = match self.blobs.get(§ion) {
516 Some(blob) => blob,
517 None => return Ok(None),
518 };
519
520 if let Some(exact) = exact {
522 let (_, item) = Self::read_exact(blob, offset, exact).await?;
523 return Ok(Some(item));
524 }
525
526 let (_, _, item) = Self::read(blob, offset).await?;
528 Ok(Some(item))
529 }
530
531 pub async fn sync(&self, section: u64) -> Result<(), Error> {
535 self.prune_guard(section, false)?;
536 let blob = match self.blobs.get(§ion) {
537 Some(blob) => blob,
538 None => return Ok(()),
539 };
540 self.synced.inc();
541 blob.sync().await.map_err(Error::Runtime)
542 }
543
544 pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
546 self.prune_guard(min, true)?;
548
549 while let Some((§ion, _)) = self.blobs.first_key_value() {
551 if section >= min {
553 break;
554 }
555
556 let blob = self.blobs.remove(§ion).unwrap();
558 blob.close().await?;
559
560 self.runtime
562 .remove(&self.cfg.partition, Some(§ion.to_be_bytes()))
563 .await?;
564 debug!(blob = section, "pruned blob");
565 self.tracked.dec();
566 self.pruned.inc();
567 }
568
569 self.oldest_allowed = Some(min);
571 Ok(())
572 }
573
574 pub async fn close(self) -> Result<(), Error> {
576 for (section, blob) in self.blobs.into_iter() {
577 blob.close().await?;
578 debug!(blob = section, "closed blob");
579 }
580 Ok(())
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use super::*;
587 use bytes::{BufMut, Bytes};
588 use commonware_macros::test_traced;
589 use commonware_runtime::{deterministic::Executor, Blob, Error as RError, Runner, Storage};
590 use futures::{pin_mut, StreamExt};
591 use prometheus_client::encoding::text::encode;
592
593 #[test_traced]
594 fn test_journal_append_and_read() {
595 let (executor, context, _) = Executor::default();
597
598 executor.start(async move {
600 let cfg = Config {
602 registry: Arc::new(Mutex::new(Registry::default())),
603 partition: "test_partition".into(),
604 };
605 let index = 1u64;
606 let data = Bytes::from("Test data");
607 let mut journal = Journal::init(context.clone(), cfg.clone())
608 .await
609 .expect("Failed to initialize journal");
610
611 journal
613 .append(index, data.clone())
614 .await
615 .expect("Failed to append data");
616
617 let mut buffer = String::new();
619 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
620 assert!(buffer.contains("tracked 1"));
621
622 journal.close().await.expect("Failed to close journal");
624
625 let cfg = Config {
627 registry: Arc::new(Mutex::new(Registry::default())),
628 partition: "test_partition".into(),
629 };
630 let mut journal = Journal::init(context, cfg.clone())
631 .await
632 .expect("Failed to re-initialize journal");
633
634 let mut items = Vec::new();
636 let stream = journal
637 .replay(1, None)
638 .await
639 .expect("unable to setup replay");
640 pin_mut!(stream);
641 while let Some(result) = stream.next().await {
642 match result {
643 Ok((blob_index, _, full_len, item)) => {
644 assert_eq!(full_len as usize, item.len());
645 items.push((blob_index, item))
646 }
647 Err(err) => panic!("Failed to read item: {}", err),
648 }
649 }
650
651 assert_eq!(items.len(), 1);
653 assert_eq!(items[0].0, index);
654 assert_eq!(items[0].1, data);
655
656 let mut buffer = String::new();
658 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
659 assert!(buffer.contains("tracked 1"));
660 });
661 }
662
663 #[test_traced]
664 fn test_journal_multiple_appends_and_reads() {
665 let (executor, context, _) = Executor::default();
667
668 executor.start(async move {
670 let cfg = Config {
672 registry: Arc::new(Mutex::new(Registry::default())),
673 partition: "test_partition".into(),
674 };
675
676 let mut journal = Journal::init(context.clone(), cfg.clone())
678 .await
679 .expect("Failed to initialize journal");
680
681 let data_items = vec![
683 (1u64, Bytes::from("Data for blob 1")),
684 (1u64, Bytes::from("Data for blob 1, second item")),
685 (2u64, Bytes::from("Data for blob 2")),
686 (3u64, Bytes::from("Data for blob 3")),
687 ];
688 for (index, data) in &data_items {
689 journal
690 .append(*index, data.clone())
691 .await
692 .expect("Failed to append data");
693 journal.sync(*index).await.expect("Failed to sync blob");
694 }
695
696 let mut buffer = String::new();
698 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
699 assert!(buffer.contains("tracked 3"));
700 assert!(buffer.contains("synced_total 4"));
701
702 journal.close().await.expect("Failed to close journal");
704
705 let mut journal = Journal::init(context, cfg)
707 .await
708 .expect("Failed to re-initialize journal");
709
710 let mut items = Vec::new();
712 {
713 let stream = journal
714 .replay(2, None)
715 .await
716 .expect("unable to setup replay");
717 pin_mut!(stream);
718 while let Some(result) = stream.next().await {
719 match result {
720 Ok((blob_index, _, full_len, item)) => {
721 assert_eq!(full_len as usize, item.len());
722 items.push((blob_index, item))
723 }
724 Err(err) => panic!("Failed to read item: {}", err),
725 }
726 }
727 }
728
729 assert_eq!(items.len(), data_items.len());
731 for ((expected_index, expected_data), (actual_index, actual_data)) in
732 data_items.iter().zip(items.iter())
733 {
734 assert_eq!(actual_index, expected_index);
735 assert_eq!(actual_data, expected_data);
736 }
737
738 {
740 let stream = journal
741 .replay(2, Some(4))
742 .await
743 .expect("unable to setup replay");
744 pin_mut!(stream);
745 while let Some(result) = stream.next().await {
746 match result {
747 Ok((_, _, full_len, item)) => {
748 assert_eq!(item, Bytes::from("Data"));
749 assert!(full_len as usize > item.len());
750 }
751 Err(err) => panic!("Failed to read item: {}", err),
752 }
753 }
754 }
755 });
756 }
757
758 #[test_traced]
759 fn test_journal_prune_blobs() {
760 let (executor, context, _) = Executor::default();
762
763 executor.start(async move {
765 let cfg = Config {
767 registry: Arc::new(Mutex::new(Registry::default())),
768 partition: "test_partition".into(),
769 };
770
771 let mut journal = Journal::init(context.clone(), cfg.clone())
773 .await
774 .expect("Failed to initialize journal");
775
776 for index in 1u64..=5u64 {
778 let data = Bytes::from(format!("Data for blob {}", index));
779 journal
780 .append(index, data)
781 .await
782 .expect("Failed to append data");
783 journal.sync(index).await.expect("Failed to sync blob");
784 }
785
786 let data = Bytes::from("Data for blob 2, second item");
788 journal
789 .append(2u64, data)
790 .await
791 .expect("Failed to append data");
792 journal.sync(2u64).await.expect("Failed to sync blob");
793
794 journal.prune(3).await.expect("Failed to prune blobs");
796
797 let result = journal.prune(2).await;
799 assert!(matches!(result, Err(Error::AlreadyPrunedToSection(3))));
800
801 let result = journal.prune(3).await;
803 assert!(matches!(result, Err(Error::AlreadyPrunedToSection(3))));
804
805 let mut buffer = String::new();
807 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
808 assert!(buffer.contains("pruned_total 2"));
809
810 journal.close().await.expect("Failed to close journal");
812
813 let mut journal = Journal::init(context.clone(), cfg.clone())
815 .await
816 .expect("Failed to re-initialize journal");
817
818 let mut items = Vec::new();
820 {
821 let stream = journal
822 .replay(1, None)
823 .await
824 .expect("unable to setup replay");
825 pin_mut!(stream);
826 while let Some(result) = stream.next().await {
827 match result {
828 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
829 Err(err) => panic!("Failed to read item: {}", err),
830 }
831 }
832 }
833
834 assert_eq!(items.len(), 3);
836 let expected_indices = [3u64, 4u64, 5u64];
837 for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
838 assert_eq!(item.0, *expected_index);
839 }
840
841 journal.prune(6).await.expect("Failed to prune blobs");
843
844 journal.close().await.expect("Failed to close journal");
846
847 assert!(context
852 .scan(&cfg.partition)
853 .await
854 .expect("Failed to list blobs")
855 .is_empty());
856 });
857 }
858
859 #[test_traced]
860 fn test_journal_with_invalid_blob_name() {
861 let (executor, context, _) = Executor::default();
863
864 executor.start(async move {
866 let cfg = Config {
868 registry: Arc::new(Mutex::new(Registry::default())),
869 partition: "test_partition".into(),
870 };
871
872 let invalid_blob_name = b"invalid"; let blob = context
875 .open(&cfg.partition, invalid_blob_name)
876 .await
877 .expect("Failed to create blob with invalid name");
878 blob.close().await.expect("Failed to close blob");
879
880 let result = Journal::init(context, cfg).await;
882
883 assert!(matches!(result, Err(Error::InvalidBlobName(_))));
885 });
886 }
887
888 fn journal_read_size_missing(exact: Option<u32>) {
889 let (executor, context, _) = Executor::default();
891
892 executor.start(async move {
894 let cfg = Config {
896 registry: Arc::new(Mutex::new(Registry::default())),
897 partition: "test_partition".into(),
898 };
899
900 let section = 1u64;
902 let blob_name = section.to_be_bytes();
903 let blob = context
904 .open(&cfg.partition, &blob_name)
905 .await
906 .expect("Failed to create blob");
907
908 let incomplete_data = vec![0x00, 0x01]; blob.write_at(&incomplete_data, 0)
911 .await
912 .expect("Failed to write incomplete data");
913 blob.close().await.expect("Failed to close blob");
914
915 let mut journal = Journal::init(context, cfg)
917 .await
918 .expect("Failed to initialize journal");
919
920 let stream = journal
922 .replay(1, exact)
923 .await
924 .expect("unable to setup replay");
925 pin_mut!(stream);
926 let mut items = Vec::new();
927 while let Some(result) = stream.next().await {
928 match result {
929 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
930 Err(err) => panic!("Failed to read item: {}", err),
931 }
932 }
933 assert!(items.is_empty());
934 });
935 }
936
937 #[test_traced]
938 fn test_journal_read_size_missing_no_exact() {
939 journal_read_size_missing(None);
940 }
941
942 #[test_traced]
943 fn test_journal_read_size_missing_with_exact() {
944 journal_read_size_missing(Some(1));
945 }
946
947 fn journal_read_item_missing(exact: Option<u32>) {
948 let (executor, context, _) = Executor::default();
950
951 executor.start(async move {
953 let cfg = Config {
955 registry: Arc::new(Mutex::new(Registry::default())),
956 partition: "test_partition".into(),
957 };
958
959 let section = 1u64;
961 let blob_name = section.to_be_bytes();
962 let blob = context
963 .open(&cfg.partition, &blob_name)
964 .await
965 .expect("Failed to create blob");
966
967 let item_size: u32 = 10; let mut buf = Vec::new();
970 buf.put_u32(item_size);
971 let data = [2u8; 5];
972 buf.put_slice(&data);
973 blob.write_at(&buf, 0)
974 .await
975 .expect("Failed to write item size");
976 blob.close().await.expect("Failed to close blob");
977
978 let mut journal = Journal::init(context, cfg)
980 .await
981 .expect("Failed to initialize journal");
982
983 let stream = journal
985 .replay(1, exact)
986 .await
987 .expect("unable to setup replay");
988
989 pin_mut!(stream);
990 let mut items = Vec::new();
991 while let Some(result) = stream.next().await {
992 match result {
993 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
994 Err(err) => panic!("Failed to read item: {}", err),
995 }
996 }
997 assert!(items.is_empty());
998 });
999 }
1000
1001 #[test_traced]
1002 fn test_journal_read_item_missing_no_exact() {
1003 journal_read_item_missing(None);
1004 }
1005
1006 #[test_traced]
1007 fn test_journal_read_item_missing_with_exact() {
1008 journal_read_item_missing(Some(1));
1009 }
1010
1011 #[test_traced]
1012 fn test_journal_read_checksum_missing() {
1013 let (executor, context, _) = Executor::default();
1015
1016 executor.start(async move {
1018 let cfg = Config {
1020 registry: Arc::new(Mutex::new(Registry::default())),
1021 partition: "test_partition".into(),
1022 };
1023
1024 let section = 1u64;
1026 let blob_name = section.to_be_bytes();
1027 let blob = context
1028 .open(&cfg.partition, &blob_name)
1029 .await
1030 .expect("Failed to create blob");
1031
1032 let item_data = b"Test data";
1034 let item_size = item_data.len() as u32;
1035
1036 let mut offset = 0;
1038 blob.write_at(&item_size.to_be_bytes(), offset)
1039 .await
1040 .expect("Failed to write item size");
1041 offset += 4;
1042
1043 blob.write_at(item_data, offset)
1045 .await
1046 .expect("Failed to write item data");
1047 blob.close().await.expect("Failed to close blob");
1050
1051 let mut journal = Journal::init(context, cfg)
1053 .await
1054 .expect("Failed to initialize journal");
1055
1056 let stream = journal
1060 .replay(1, None)
1061 .await
1062 .expect("unable to setup replay");
1063 pin_mut!(stream);
1064 let mut items = Vec::new();
1065 while let Some(result) = stream.next().await {
1066 match result {
1067 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1068 Err(err) => panic!("Failed to read item: {}", err),
1069 }
1070 }
1071 assert!(items.is_empty());
1072 });
1073 }
1074
1075 #[test_traced]
1076 fn test_journal_read_checksum_mismatch() {
1077 let (executor, context, _) = Executor::default();
1079
1080 executor.start(async move {
1082 let cfg = Config {
1084 registry: Arc::new(Mutex::new(Registry::default())),
1085 partition: "test_partition".into(),
1086 };
1087
1088 let section = 1u64;
1090 let blob_name = section.to_be_bytes();
1091 let blob = context
1092 .open(&cfg.partition, &blob_name)
1093 .await
1094 .expect("Failed to create blob");
1095
1096 let item_data = b"Test data";
1098 let item_size = item_data.len() as u32;
1099 let incorrect_checksum: u32 = 0xDEADBEEF;
1100
1101 let mut offset = 0;
1103 blob.write_at(&item_size.to_be_bytes(), offset)
1104 .await
1105 .expect("Failed to write item size");
1106 offset += 4;
1107
1108 blob.write_at(item_data, offset)
1110 .await
1111 .expect("Failed to write item data");
1112 offset += item_data.len() as u64;
1113
1114 blob.write_at(&incorrect_checksum.to_be_bytes(), offset)
1116 .await
1117 .expect("Failed to write incorrect checksum");
1118
1119 blob.close().await.expect("Failed to close blob");
1120
1121 let mut journal = Journal::init(context, cfg)
1123 .await
1124 .expect("Failed to initialize journal");
1125
1126 let stream = journal
1128 .replay(1, None)
1129 .await
1130 .expect("unable to setup replay");
1131 pin_mut!(stream);
1132 let mut items = Vec::new();
1133 let mut got_checksum_error = false;
1134 while let Some(result) = stream.next().await {
1135 match result {
1136 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1137 Err(err) => {
1138 assert!(matches!(err, Error::ChecksumMismatch(_, _)));
1139 got_checksum_error = true;
1140 }
1143 }
1144 }
1145 assert!(got_checksum_error, "expected checksum mismatch error");
1146 });
1147 }
1148
1149 #[test_traced]
1150 fn test_journal_handling_truncated_data() {
1151 let (executor, context, _) = Executor::default();
1153
1154 executor.start(async move {
1156 let cfg = Config {
1158 registry: Arc::new(Mutex::new(Registry::default())),
1159 partition: "test_partition".into(),
1160 };
1161
1162 let mut journal = Journal::init(context.clone(), cfg.clone())
1164 .await
1165 .expect("Failed to initialize journal");
1166
1167 journal
1169 .append(1, Bytes::from("Valid data"))
1170 .await
1171 .expect("Failed to append data");
1172
1173 let data_items = vec![
1175 (2u64, Bytes::from("Valid data")),
1176 (2u64, Bytes::from("Valid data, second item")),
1177 (2u64, Bytes::from("Valid data, third item")),
1178 ];
1179 for (index, data) in &data_items {
1180 journal
1181 .append(*index, data.clone())
1182 .await
1183 .expect("Failed to append data");
1184 journal.sync(*index).await.expect("Failed to sync blob");
1185 }
1186
1187 journal.close().await.expect("Failed to close journal");
1189
1190 let blob = context
1192 .open(&cfg.partition, &2u64.to_be_bytes())
1193 .await
1194 .expect("Failed to open blob");
1195 let blob_len = blob.len().await.expect("Failed to get blob length");
1196 blob.truncate(blob_len - 4)
1197 .await
1198 .expect("Failed to corrupt blob");
1199 blob.close().await.expect("Failed to close blob");
1200
1201 let mut journal = Journal::init(context, cfg)
1203 .await
1204 .expect("Failed to re-initialize journal");
1205
1206 let mut items = Vec::new();
1208 let stream = journal
1209 .replay(1, None)
1210 .await
1211 .expect("unable to setup replay");
1212 pin_mut!(stream);
1213 while let Some(result) = stream.next().await {
1214 match result {
1215 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1216 Err(err) => panic!("Failed to read item: {}", err),
1217 }
1218 }
1219
1220 assert_eq!(items.len(), 3);
1222 assert_eq!(items[0].0, 1);
1223 assert_eq!(items[0].1, Bytes::from("Valid data"));
1224 assert_eq!(items[1].0, data_items[0].0);
1225 assert_eq!(items[1].1, data_items[0].1);
1226 assert_eq!(items[2].0, data_items[1].0);
1227 assert_eq!(items[2].1, data_items[1].1);
1228 });
1229 }
1230
1231 #[derive(Clone)]
1233 struct MockBlob {
1234 len: u64,
1235 }
1236
1237 impl Blob for MockBlob {
1238 async fn len(&self) -> Result<u64, commonware_runtime::Error> {
1239 Ok(self.len)
1241 }
1242
1243 async fn read_at(&self, _buf: &mut [u8], _offset: u64) -> Result<(), RError> {
1244 Ok(())
1245 }
1246
1247 async fn write_at(&self, _buf: &[u8], _offset: u64) -> Result<(), RError> {
1248 Ok(())
1249 }
1250
1251 async fn truncate(&self, _len: u64) -> Result<(), RError> {
1252 Ok(())
1253 }
1254
1255 async fn sync(&self) -> Result<(), RError> {
1256 Ok(())
1257 }
1258
1259 async fn close(self) -> Result<(), RError> {
1260 Ok(())
1261 }
1262 }
1263
1264 #[derive(Clone)]
1266 struct MockStorage {
1267 len: u64,
1268 }
1269
1270 impl Storage<MockBlob> for MockStorage {
1271 async fn open(&self, _partition: &str, _name: &[u8]) -> Result<MockBlob, RError> {
1272 Ok(MockBlob { len: self.len })
1273 }
1274
1275 async fn remove(&self, _partition: &str, _name: Option<&[u8]>) -> Result<(), RError> {
1276 Ok(())
1277 }
1278
1279 async fn scan(&self, _partition: &str) -> Result<Vec<Vec<u8>>, RError> {
1280 Ok(vec![])
1281 }
1282 }
1283
1284 const INDEX_ALIGNMENT: u64 = 16;
1287
1288 #[test_traced]
1289 fn test_journal_large_offset() {
1290 let (executor, _, _) = Executor::default();
1292 executor.start(async move {
1293 let cfg = Config {
1295 registry: Arc::new(Mutex::new(Registry::default())),
1296 partition: "partition".to_string(),
1297 };
1298 let runtime = MockStorage {
1299 len: u32::MAX as u64 * INDEX_ALIGNMENT, };
1301 let mut journal = Journal::init(runtime, cfg).await.unwrap();
1302
1303 let data = Bytes::from("Test data");
1305 let result = journal
1306 .append(1, data)
1307 .await
1308 .expect("Failed to append data");
1309 assert_eq!(result, u32::MAX);
1310 });
1311 }
1312
1313 #[test_traced]
1314 fn test_journal_offset_overflow() {
1315 let (executor, _, _) = Executor::default();
1317 executor.start(async move {
1318 let cfg = Config {
1320 registry: Arc::new(Mutex::new(Registry::default())),
1321 partition: "partition".to_string(),
1322 };
1323 let runtime = MockStorage {
1324 len: u32::MAX as u64 * INDEX_ALIGNMENT + 1,
1325 };
1326 let mut journal = Journal::init(runtime, cfg).await.unwrap();
1327
1328 let data = Bytes::from("Test data");
1330 let result = journal.append(1, data).await;
1331 assert!(matches!(result, Err(Error::OffsetOverflow)));
1332 });
1333 }
1334}