1use super::Error;
102use bytes::BufMut;
103use commonware_codec::Codec;
104use commonware_runtime::{
105 buffer::{Read, Write},
106 Blob, Error as RError, Metrics, Storage,
107};
108use commonware_utils::hex;
109use futures::stream::{self, Stream, StreamExt};
110use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
111use std::{
112 collections::{btree_map::Entry, BTreeMap},
113 io::Cursor,
114 marker::PhantomData,
115};
116use tracing::{debug, trace, warn};
117use zstd::{bulk::compress, decode_all};
118
119#[derive(Clone)]
121pub struct Config<C> {
122 pub partition: String,
125
126 pub compression: Option<u8>,
128
129 pub codec_config: C,
131
132 pub write_buffer: usize,
134}
135
136const ITEM_ALIGNMENT: u64 = 16;
137
138#[inline]
141fn compute_next_offset(mut offset: u64) -> Result<u32, Error> {
142 let overage = offset % ITEM_ALIGNMENT;
143 if overage != 0 {
144 offset += ITEM_ALIGNMENT - overage;
145 }
146 let offset = offset / ITEM_ALIGNMENT;
147 let aligned_offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
148 Ok(aligned_offset)
149}
150
151pub struct Journal<E: Storage + Metrics, V: Codec> {
153 context: E,
154 cfg: Config<V::Cfg>,
155
156 oldest_allowed: Option<u64>,
157
158 blobs: BTreeMap<u64, Write<E::Blob>>,
159
160 tracked: Gauge,
161 synced: Counter,
162 pruned: Counter,
163
164 _phantom: PhantomData<V>,
165}
166
167impl<E: Storage + Metrics, V: Codec> Journal<E, V> {
168 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
174 let mut blobs = BTreeMap::new();
176 let stored_blobs = match context.scan(&cfg.partition).await {
177 Ok(blobs) => blobs,
178 Err(RError::PartitionMissing(_)) => Vec::new(),
179 Err(err) => return Err(Error::Runtime(err)),
180 };
181 for name in stored_blobs {
182 let (blob, size) = context.open(&cfg.partition, &name).await?;
183 let hex_name = hex(&name);
184 let section = match name.try_into() {
185 Ok(section) => u64::from_be_bytes(section),
186 Err(_) => return Err(Error::InvalidBlobName(hex_name)),
187 };
188 debug!(section, blob = hex_name, size, "loaded section");
189 let blob = Write::new(blob, size, cfg.write_buffer);
190 blobs.insert(section, blob);
191 }
192
193 let tracked = Gauge::default();
195 let synced = Counter::default();
196 let pruned = Counter::default();
197 context.register("tracked", "Number of blobs", tracked.clone());
198 context.register("synced", "Number of syncs", synced.clone());
199 context.register("pruned", "Number of blobs pruned", pruned.clone());
200 tracked.set(blobs.len() as i64);
201
202 Ok(Self {
204 context,
205 cfg,
206
207 oldest_allowed: None,
208
209 blobs,
210 tracked,
211 synced,
212 pruned,
213
214 _phantom: PhantomData,
215 })
216 }
217
218 fn prune_guard(&self, section: u64, inclusive: bool) -> Result<(), Error> {
220 if let Some(oldest_allowed) = self.oldest_allowed {
221 if section < oldest_allowed || (inclusive && section <= oldest_allowed) {
222 return Err(Error::AlreadyPrunedToSection(oldest_allowed));
223 }
224 }
225 Ok(())
226 }
227
228 async fn read(
230 compressed: bool,
231 cfg: &V::Cfg,
232 blob: &Write<E::Blob>,
233 offset: u32,
234 ) -> Result<(u32, u32, V), Error> {
235 let mut hasher = crc32fast::Hasher::new();
237 let offset = offset as u64 * ITEM_ALIGNMENT;
238 let size = blob.read_at(vec![0; 4], offset).await?;
239 hasher.update(size.as_ref());
240 let size = u32::from_be_bytes(size.as_ref().try_into().unwrap()) as usize;
241 let offset = offset.checked_add(4).ok_or(Error::OffsetOverflow)?;
242
243 let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
245 let buf = blob.read_at(vec![0u8; buf_size], offset).await?;
246 let buf = buf.as_ref();
247 let offset = offset
248 .checked_add(buf_size as u64)
249 .ok_or(Error::OffsetOverflow)?;
250
251 let item = &buf[..size];
253 hasher.update(item);
254
255 let checksum = hasher.finalize();
257 let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
258 if checksum != stored_checksum {
259 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
260 }
261
262 let aligned_offset = compute_next_offset(offset)?;
264
265 let item = if compressed {
267 let decompressed =
268 decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
269 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
270 } else {
271 V::decode_cfg(item, cfg).map_err(Error::Codec)?
272 };
273
274 Ok((aligned_offset, size as u32, item))
276 }
277
278 async fn read_buffered(
280 reader: &mut Read<Write<E::Blob>>,
281 offset: u32,
282 cfg: &V::Cfg,
283 compressed: bool,
284 ) -> Result<(u32, u64, u32, V), Error> {
285 let file_offset = offset as u64 * ITEM_ALIGNMENT;
287
288 if reader.position() != file_offset {
290 reader.seek_to(file_offset).map_err(Error::Runtime)?;
291 }
292
293 let mut hasher = crc32fast::Hasher::new();
295 let mut size_buf = [0u8; 4];
296 reader
297 .read_exact(&mut size_buf, 4)
298 .await
299 .map_err(Error::Runtime)?;
300 hasher.update(&size_buf);
301
302 let size = u32::from_be_bytes(size_buf) as usize;
304 let buf_size = size.checked_add(4).ok_or(Error::OffsetOverflow)?;
305 let mut buf = vec![0u8; buf_size];
306 reader
307 .read_exact(&mut buf, buf_size)
308 .await
309 .map_err(Error::Runtime)?;
310
311 let item = &buf[..size];
313 hasher.update(item);
314
315 let checksum = hasher.finalize();
317 let stored_checksum = u32::from_be_bytes(buf[size..].try_into().unwrap());
318 if checksum != stored_checksum {
319 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
320 }
321
322 let item = if compressed {
324 let decompressed =
325 decode_all(Cursor::new(&item)).map_err(|_| Error::DecompressionFailed)?;
326 V::decode_cfg(decompressed.as_ref(), cfg).map_err(Error::Codec)?
327 } else {
328 V::decode_cfg(item, cfg).map_err(Error::Codec)?
329 };
330
331 let current_pos = reader.position();
333 let aligned_offset = compute_next_offset(current_pos)?;
334 Ok((aligned_offset, current_pos, size as u32, item))
335 }
336
337 async fn read_exact(
339 compressed: bool,
340 cfg: &V::Cfg,
341 blob: &Write<E::Blob>,
342 offset: u32,
343 len: u32,
344 ) -> Result<V, Error> {
345 let offset = offset as u64 * ITEM_ALIGNMENT;
347 let entry_size = 4 + len as usize + 4;
348 let buf = blob.read_at(vec![0u8; entry_size], offset).await?;
349
350 let mut hasher = crc32fast::Hasher::new();
352 let disk_size = u32::from_be_bytes(buf.as_ref()[..4].try_into().unwrap());
353 hasher.update(&buf.as_ref()[..4]);
354 if disk_size != len {
355 return Err(Error::UnexpectedSize(disk_size, len));
356 }
357
358 let item = &buf.as_ref()[4..4 + len as usize];
360 hasher.update(item);
361 let checksum = hasher.finalize();
362 let stored_checksum =
363 u32::from_be_bytes(buf.as_ref()[4 + len as usize..].try_into().unwrap());
364 if checksum != stored_checksum {
365 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
366 }
367
368 let item = if compressed {
370 decode_all(Cursor::new(item)).map_err(|_| Error::DecompressionFailed)?
371 } else {
372 item.to_vec()
373 };
374
375 let item = V::decode_cfg(item.as_ref(), cfg).map_err(Error::Codec)?;
377 Ok(item)
378 }
379
380 pub async fn replay(
390 &self,
391 buffer: usize,
392 ) -> Result<impl Stream<Item = Result<(u64, u32, u32, V), Error>> + '_, Error> {
393 let codec_config = self.cfg.codec_config.clone();
395 let compressed = self.cfg.compression.is_some();
396 let mut blobs = Vec::with_capacity(self.blobs.len());
397 for (section, blob) in self.blobs.iter() {
398 let blob_size = blob.size().await;
399 let max_offset = compute_next_offset(blob_size)?;
400 blobs.push((
401 *section,
402 blob.clone(),
403 max_offset,
404 blob_size,
405 codec_config.clone(),
406 compressed,
407 ));
408 }
409
410 Ok(
413 stream::iter(blobs).flat_map(
414 move |(section, blob, max_offset, blob_size, codec_config, compressed)| {
415 let reader = Read::new(blob, blob_size, buffer);
417
418 stream::unfold(
420 (section, reader, 0u32, 0u64, codec_config, compressed),
421 move |(
422 section,
423 mut reader,
424 offset,
425 valid_size,
426 codec_config,
427 compressed,
428 )| async move {
429 if offset >= max_offset {
431 return None;
432 }
433
434 match Self::read_buffered(
436 &mut reader,
437 offset,
438 &codec_config,
439 compressed,
440 )
441 .await
442 {
443 Ok((next_offset, next_valid_size, size, item)) => {
444 trace!(blob = section, cursor = offset, "replayed item");
445 Some((
446 Ok((section, offset, size, item)),
447 (
448 section,
449 reader,
450 next_offset,
451 next_valid_size,
452 codec_config,
453 compressed,
454 ),
455 ))
456 }
457 Err(Error::ChecksumMismatch(expected, found)) => {
458 warn!(
462 blob = section,
463 new_offset = offset,
464 new_size = valid_size,
465 expected,
466 found,
467 "corruption detected: truncating"
468 );
469 reader.resize(valid_size).await.ok()?;
470 None
471 }
472 Err(Error::Runtime(RError::BlobInsufficientLength)) => {
473 warn!(
477 blob = section,
478 new_offset = offset,
479 new_size = valid_size,
480 "trailing bytes detected: truncating"
481 );
482 reader.resize(valid_size).await.ok()?;
483 None
484 }
485 Err(err) => {
486 warn!(
489 blob = section,
490 cursor = offset,
491 ?err,
492 "unexpected error"
493 );
494 Some((
495 Err(err),
496 (
497 section,
498 reader,
499 offset,
500 valid_size,
501 codec_config,
502 compressed,
503 ),
504 ))
505 }
506 }
507 },
508 )
509 },
510 ),
511 )
512 }
513
514 pub async fn append(&mut self, section: u64, item: V) -> Result<(u32, u32), Error> {
526 self.prune_guard(section, false)?;
528
529 let encoded = item.encode();
531 let encoded = if let Some(compression) = self.cfg.compression {
532 compress(&encoded, compression as i32).map_err(|_| Error::CompressionFailed)?
533 } else {
534 encoded.into()
535 };
536
537 let item_len = encoded.len();
539 let entry_len = 4 + item_len + 4;
540 let item_len = match item_len.try_into() {
541 Ok(len) => len,
542 Err(_) => return Err(Error::ItemTooLarge(item_len)),
543 };
544
545 let blob = match self.blobs.entry(section) {
547 Entry::Occupied(entry) => entry.into_mut(),
548 Entry::Vacant(entry) => {
549 let name = section.to_be_bytes();
550 let (blob, size) = self.context.open(&self.cfg.partition, &name).await?;
551 let blob = Write::new(blob, size, self.cfg.write_buffer);
552 self.tracked.inc();
553 entry.insert(blob)
554 }
555 };
556
557 let mut buf = Vec::with_capacity(entry_len);
559 buf.put_u32(item_len);
560 buf.put_slice(&encoded);
561 let checksum = crc32fast::hash(&buf);
562 buf.put_u32(checksum);
563 assert_eq!(buf.len(), entry_len);
564
565 let cursor = blob.size().await;
567 let offset = compute_next_offset(cursor)?;
568 let aligned_cursor = offset as u64 * ITEM_ALIGNMENT;
569 blob.write_at(buf, aligned_cursor).await?;
570 trace!(blob = section, offset, "appended item");
571 Ok((offset, item_len))
572 }
573
574 pub async fn get(&self, section: u64, offset: u32) -> Result<Option<V>, Error> {
576 self.prune_guard(section, false)?;
577 let blob = match self.blobs.get(§ion) {
578 Some(blob) => blob,
579 None => return Ok(None),
580 };
581
582 let (_, _, item) = Self::read(
584 self.cfg.compression.is_some(),
585 &self.cfg.codec_config,
586 blob,
587 offset,
588 )
589 .await?;
590 Ok(Some(item))
591 }
592
593 pub async fn get_exact(
595 &self,
596 section: u64,
597 offset: u32,
598 size: u32,
599 ) -> Result<Option<V>, Error> {
600 self.prune_guard(section, false)?;
601 let blob = match self.blobs.get(§ion) {
602 Some(blob) => blob,
603 None => return Ok(None),
604 };
605
606 let item = Self::read_exact(
608 self.cfg.compression.is_some(),
609 &self.cfg.codec_config,
610 blob,
611 offset,
612 size,
613 )
614 .await?;
615 Ok(Some(item))
616 }
617
618 pub async fn size(&self, section: u64) -> Result<u64, Error> {
622 self.prune_guard(section, false)?;
623 match self.blobs.get(§ion) {
624 Some(blob) => Ok(blob.size().await),
625 None => Ok(0),
626 }
627 }
628
629 pub async fn rewind_to_offset(&mut self, section: u64, offset: u32) -> Result<(), Error> {
631 self.rewind(section, offset as u64 * ITEM_ALIGNMENT).await
632 }
633
634 pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
638 self.prune_guard(section, false)?;
639
640 let trailing: Vec<u64> = self
642 .blobs
643 .range((
644 std::ops::Bound::Excluded(section),
645 std::ops::Bound::Unbounded,
646 ))
647 .map(|(§ion, _)| section)
648 .collect();
649 for index in &trailing {
650 let blob = self.blobs.remove(index).unwrap();
652
653 blob.close().await?;
655 self.context
656 .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
657 .await?;
658 debug!(section = index, "removed section");
659 self.tracked.dec();
660 }
661
662 let blob = match self.blobs.get_mut(§ion) {
664 Some(blob) => blob,
665 None => return Ok(()),
666 };
667 let current = blob.size().await;
668 if size >= current {
669 return Ok(()); }
671 blob.resize(size).await?;
672 debug!(
673 section,
674 from = current,
675 to = size,
676 ?trailing,
677 "rewound journal"
678 );
679 Ok(())
680 }
681
682 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
686 self.prune_guard(section, false)?;
687
688 let blob = match self.blobs.get_mut(§ion) {
690 Some(blob) => blob,
691 None => return Ok(()),
692 };
693
694 let current = blob.size().await;
696 if size >= current {
697 return Ok(()); }
699 blob.resize(size).await?;
700 debug!(section, from = current, to = size, "rewound section");
701 Ok(())
702 }
703
704 pub async fn sync(&self, section: u64) -> Result<(), Error> {
708 self.prune_guard(section, false)?;
709 let blob = match self.blobs.get(§ion) {
710 Some(blob) => blob,
711 None => return Ok(()),
712 };
713 self.synced.inc();
714 blob.sync().await.map_err(Error::Runtime)
715 }
716
717 pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
719 self.prune_guard(min, true)?;
721
722 while let Some((§ion, _)) = self.blobs.first_key_value() {
724 if section >= min {
726 break;
727 }
728
729 let blob = self.blobs.remove(§ion).unwrap();
731 let size = blob.size().await;
732 blob.close().await?;
733
734 self.context
736 .remove(&self.cfg.partition, Some(§ion.to_be_bytes()))
737 .await?;
738 debug!(blob = section, size, "pruned blob");
739 self.tracked.dec();
740 self.pruned.inc();
741 }
742
743 self.oldest_allowed = Some(min);
745 Ok(())
746 }
747
748 pub async fn close(self) -> Result<(), Error> {
750 for (section, blob) in self.blobs.into_iter() {
751 let size = blob.size().await;
752 blob.close().await?;
753 debug!(blob = section, size, "closed blob");
754 }
755 Ok(())
756 }
757
758 pub async fn destroy(self) -> Result<(), Error> {
760 for (i, blob) in self.blobs.into_iter() {
761 let size = blob.size().await;
762 blob.close().await?;
763 debug!(blob = i, size, "destroyed blob");
764 self.context
765 .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
766 .await?;
767 }
768 match self.context.remove(&self.cfg.partition, None).await {
769 Ok(()) => {}
770 Err(RError::PartitionMissing(_)) => {
771 }
773 Err(err) => return Err(Error::Runtime(err)),
774 }
775 Ok(())
776 }
777}
778
779#[cfg(test)]
780mod tests {
781 use super::*;
782 use bytes::BufMut;
783 use commonware_cryptography::hash;
784 use commonware_macros::test_traced;
785 use commonware_runtime::{deterministic, Blob, Error as RError, Runner, Storage};
786 use commonware_utils::StableBuf;
787 use futures::{pin_mut, StreamExt};
788 use prometheus_client::registry::Metric;
789
790 #[test_traced]
791 fn test_journal_append_and_read() {
792 let executor = deterministic::Runner::default();
794
795 executor.start(|context| async move {
797 let cfg = Config {
799 partition: "test_partition".into(),
800 compression: None,
801 codec_config: (),
802 write_buffer: 1024,
803 };
804 let index = 1u64;
805 let data = 10;
806 let mut journal = Journal::init(context.clone(), cfg.clone())
807 .await
808 .expect("Failed to initialize journal");
809
810 journal
812 .append(index, data)
813 .await
814 .expect("Failed to append data");
815
816 let buffer = context.encode();
818 assert!(buffer.contains("tracked 1"));
819
820 journal.close().await.expect("Failed to close journal");
822
823 let cfg = Config {
825 partition: "test_partition".into(),
826 compression: None,
827 codec_config: (),
828 write_buffer: 1024,
829 };
830 let journal = Journal::<_, i32>::init(context.clone(), cfg.clone())
831 .await
832 .expect("Failed to re-initialize journal");
833
834 let mut items = Vec::new();
836 let stream = journal.replay(1024).await.expect("unable to setup replay");
837 pin_mut!(stream);
838 while let Some(result) = stream.next().await {
839 match result {
840 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
841 Err(err) => panic!("Failed to read item: {err}"),
842 }
843 }
844
845 assert_eq!(items.len(), 1);
847 assert_eq!(items[0].0, index);
848 assert_eq!(items[0].1, data);
849
850 let buffer = context.encode();
852 assert!(buffer.contains("tracked 1"));
853 });
854 }
855
856 #[test_traced]
857 fn test_journal_multiple_appends_and_reads() {
858 let executor = deterministic::Runner::default();
860
861 executor.start(|context| async move {
863 let cfg = Config {
865 partition: "test_partition".into(),
866 compression: None,
867 codec_config: (),
868 write_buffer: 1024,
869 };
870
871 let mut journal = Journal::init(context.clone(), cfg.clone())
873 .await
874 .expect("Failed to initialize journal");
875
876 let data_items = vec![(1u64, 1), (1u64, 2), (2u64, 3), (3u64, 4)];
878 for (index, data) in &data_items {
879 journal
880 .append(*index, *data)
881 .await
882 .expect("Failed to append data");
883 journal.sync(*index).await.expect("Failed to sync blob");
884 }
885
886 let buffer = context.encode();
888 assert!(buffer.contains("tracked 3"));
889 assert!(buffer.contains("synced_total 4"));
890
891 journal.close().await.expect("Failed to close journal");
893
894 let journal = Journal::init(context, cfg)
896 .await
897 .expect("Failed to re-initialize journal");
898
899 let mut items = Vec::<(u64, u32)>::new();
901 {
902 let stream = journal.replay(1024).await.expect("unable to setup replay");
903 pin_mut!(stream);
904 while let Some(result) = stream.next().await {
905 match result {
906 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
907 Err(err) => panic!("Failed to read item: {err}"),
908 }
909 }
910 }
911
912 assert_eq!(items.len(), data_items.len());
914 for ((expected_index, expected_data), (actual_index, actual_data)) in
915 data_items.iter().zip(items.iter())
916 {
917 assert_eq!(actual_index, expected_index);
918 assert_eq!(actual_data, expected_data);
919 }
920
921 journal.destroy().await.expect("Failed to destroy journal");
923 });
924 }
925
926 #[test_traced]
927 fn test_journal_prune_blobs() {
928 let executor = deterministic::Runner::default();
930
931 executor.start(|context| async move {
933 let cfg = Config {
935 partition: "test_partition".into(),
936 compression: None,
937 codec_config: (),
938 write_buffer: 1024,
939 };
940
941 let mut journal = Journal::init(context.clone(), cfg.clone())
943 .await
944 .expect("Failed to initialize journal");
945
946 for index in 1u64..=5u64 {
948 journal
949 .append(index, index)
950 .await
951 .expect("Failed to append data");
952 journal.sync(index).await.expect("Failed to sync blob");
953 }
954
955 let data = 99;
957 journal
958 .append(2u64, data)
959 .await
960 .expect("Failed to append data");
961 journal.sync(2u64).await.expect("Failed to sync blob");
962
963 journal.prune(3).await.expect("Failed to prune blobs");
965
966 let result = journal.prune(2).await;
968 assert!(matches!(result, Err(Error::AlreadyPrunedToSection(3))));
969
970 let result = journal.prune(3).await;
972 assert!(matches!(result, Err(Error::AlreadyPrunedToSection(3))));
973
974 let buffer = context.encode();
976 assert!(buffer.contains("pruned_total 2"));
977
978 journal.close().await.expect("Failed to close journal");
980
981 let mut journal = Journal::init(context.clone(), cfg.clone())
983 .await
984 .expect("Failed to re-initialize journal");
985
986 let mut items = Vec::<(u64, u64)>::new();
988 {
989 let stream = journal.replay(1024).await.expect("unable to setup replay");
990 pin_mut!(stream);
991 while let Some(result) = stream.next().await {
992 match result {
993 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
994 Err(err) => panic!("Failed to read item: {err}"),
995 }
996 }
997 }
998
999 assert_eq!(items.len(), 3);
1001 let expected_indices = [3u64, 4u64, 5u64];
1002 for (item, expected_index) in items.iter().zip(expected_indices.iter()) {
1003 assert_eq!(item.0, *expected_index);
1004 }
1005
1006 journal.prune(6).await.expect("Failed to prune blobs");
1008
1009 journal.close().await.expect("Failed to close journal");
1011
1012 assert!(context
1017 .scan(&cfg.partition)
1018 .await
1019 .expect("Failed to list blobs")
1020 .is_empty());
1021 });
1022 }
1023
1024 #[test_traced]
1025 fn test_journal_with_invalid_blob_name() {
1026 let executor = deterministic::Runner::default();
1028
1029 executor.start(|context| async move {
1031 let cfg = Config {
1033 partition: "test_partition".into(),
1034 compression: None,
1035 codec_config: (),
1036 write_buffer: 1024,
1037 };
1038
1039 let invalid_blob_name = b"invalid"; let (blob, _) = context
1042 .open(&cfg.partition, invalid_blob_name)
1043 .await
1044 .expect("Failed to create blob with invalid name");
1045 blob.close().await.expect("Failed to close blob");
1046
1047 let result = Journal::<_, u64>::init(context, cfg).await;
1049
1050 assert!(matches!(result, Err(Error::InvalidBlobName(_))));
1052 });
1053 }
1054
1055 #[test_traced]
1056 fn test_journal_read_size_missing() {
1057 let executor = deterministic::Runner::default();
1059
1060 executor.start(|context| async move {
1062 let cfg = Config {
1064 partition: "test_partition".into(),
1065 compression: None,
1066 codec_config: (),
1067 write_buffer: 1024,
1068 };
1069
1070 let section = 1u64;
1072 let blob_name = section.to_be_bytes();
1073 let (blob, _) = context
1074 .open(&cfg.partition, &blob_name)
1075 .await
1076 .expect("Failed to create blob");
1077
1078 let incomplete_data = vec![0x00, 0x01]; blob.write_at(incomplete_data, 0)
1081 .await
1082 .expect("Failed to write incomplete data");
1083 blob.close().await.expect("Failed to close blob");
1084
1085 let journal = Journal::init(context, cfg)
1087 .await
1088 .expect("Failed to initialize journal");
1089
1090 let stream = journal.replay(1024).await.expect("unable to setup replay");
1092 pin_mut!(stream);
1093 let mut items = Vec::<(u64, u64)>::new();
1094 while let Some(result) = stream.next().await {
1095 match result {
1096 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1097 Err(err) => panic!("Failed to read item: {err}"),
1098 }
1099 }
1100 assert!(items.is_empty());
1101 });
1102 }
1103
1104 #[test_traced]
1105 fn test_journal_read_item_missing() {
1106 let executor = deterministic::Runner::default();
1108
1109 executor.start(|context| async move {
1111 let cfg = Config {
1113 partition: "test_partition".into(),
1114 compression: None,
1115 codec_config: (),
1116 write_buffer: 1024,
1117 };
1118
1119 let section = 1u64;
1121 let blob_name = section.to_be_bytes();
1122 let (blob, _) = context
1123 .open(&cfg.partition, &blob_name)
1124 .await
1125 .expect("Failed to create blob");
1126
1127 let item_size: u32 = 10; let mut buf = Vec::new();
1130 buf.put_u32(item_size);
1131 let data = [2u8; 5];
1132 BufMut::put_slice(&mut buf, &data);
1133 blob.write_at(buf, 0)
1134 .await
1135 .expect("Failed to write item size");
1136 blob.close().await.expect("Failed to close blob");
1137
1138 let journal = Journal::init(context, cfg)
1140 .await
1141 .expect("Failed to initialize journal");
1142
1143 let stream = journal.replay(1024).await.expect("unable to setup replay");
1145 pin_mut!(stream);
1146 let mut items = Vec::<(u64, u64)>::new();
1147 while let Some(result) = stream.next().await {
1148 match result {
1149 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1150 Err(err) => panic!("Failed to read item: {err}"),
1151 }
1152 }
1153 assert!(items.is_empty());
1154 });
1155 }
1156
1157 #[test_traced]
1158 fn test_journal_read_checksum_missing() {
1159 let executor = deterministic::Runner::default();
1161
1162 executor.start(|context| async move {
1164 let cfg = Config {
1166 partition: "test_partition".into(),
1167 compression: None,
1168 codec_config: (),
1169 write_buffer: 1024,
1170 };
1171
1172 let section = 1u64;
1174 let blob_name = section.to_be_bytes();
1175 let (blob, _) = context
1176 .open(&cfg.partition, &blob_name)
1177 .await
1178 .expect("Failed to create blob");
1179
1180 let item_data = b"Test data";
1182 let item_size = item_data.len() as u32;
1183
1184 let mut offset = 0;
1186 blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1187 .await
1188 .expect("Failed to write item size");
1189 offset += 4;
1190
1191 blob.write_at(item_data.to_vec(), offset)
1193 .await
1194 .expect("Failed to write item data");
1195 blob.close().await.expect("Failed to close blob");
1198
1199 let journal = Journal::init(context, cfg)
1201 .await
1202 .expect("Failed to initialize journal");
1203
1204 let stream = journal.replay(1024).await.expect("unable to setup replay");
1208 pin_mut!(stream);
1209 let mut items = Vec::<(u64, u64)>::new();
1210 while let Some(result) = stream.next().await {
1211 match result {
1212 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1213 Err(err) => panic!("Failed to read item: {err}"),
1214 }
1215 }
1216 assert!(items.is_empty());
1217 });
1218 }
1219
1220 #[test_traced]
1221 fn test_journal_read_checksum_mismatch() {
1222 let executor = deterministic::Runner::default();
1224
1225 executor.start(|context| async move {
1227 let cfg = Config {
1229 partition: "test_partition".into(),
1230 compression: None,
1231 codec_config: (),
1232 write_buffer: 1024,
1233 };
1234
1235 let section = 1u64;
1237 let blob_name = section.to_be_bytes();
1238 let (blob, _) = context
1239 .open(&cfg.partition, &blob_name)
1240 .await
1241 .expect("Failed to create blob");
1242
1243 let item_data = b"Test data";
1245 let item_size = item_data.len() as u32;
1246 let incorrect_checksum: u32 = 0xDEADBEEF;
1247
1248 let mut offset = 0;
1250 blob.write_at(item_size.to_be_bytes().to_vec(), offset)
1251 .await
1252 .expect("Failed to write item size");
1253 offset += 4;
1254
1255 blob.write_at(item_data.to_vec(), offset)
1257 .await
1258 .expect("Failed to write item data");
1259 offset += item_data.len() as u64;
1260
1261 blob.write_at(incorrect_checksum.to_be_bytes().to_vec(), offset)
1263 .await
1264 .expect("Failed to write incorrect checksum");
1265
1266 blob.close().await.expect("Failed to close blob");
1267
1268 let journal = Journal::init(context.clone(), cfg.clone())
1270 .await
1271 .expect("Failed to initialize journal");
1272
1273 {
1275 let stream = journal.replay(1024).await.expect("unable to setup replay");
1276 pin_mut!(stream);
1277 let mut items = Vec::<(u64, u64)>::new();
1278 while let Some(result) = stream.next().await {
1279 match result {
1280 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1281 Err(err) => panic!("Failed to read item: {err}"),
1282 }
1283 }
1284 assert!(items.is_empty());
1285 }
1286 journal.close().await.expect("Failed to close journal");
1287
1288 let (_, blob_size) = context
1290 .open(&cfg.partition, §ion.to_be_bytes())
1291 .await
1292 .expect("Failed to open blob");
1293 assert_eq!(blob_size, 0);
1294 });
1295 }
1296
1297 #[test_traced]
1298 fn test_journal_handling_unaligned_truncated_data() {
1299 let executor = deterministic::Runner::default();
1301
1302 executor.start(|context| async move {
1304 let cfg = Config {
1306 partition: "test_partition".into(),
1307 compression: None,
1308 codec_config: (),
1309 write_buffer: 1024,
1310 };
1311
1312 let mut journal = Journal::init(context.clone(), cfg.clone())
1314 .await
1315 .expect("Failed to initialize journal");
1316
1317 journal.append(1, 1).await.expect("Failed to append data");
1319
1320 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1322 for (index, data) in &data_items {
1323 journal
1324 .append(*index, *data)
1325 .await
1326 .expect("Failed to append data");
1327 journal.sync(*index).await.expect("Failed to sync blob");
1328 }
1329
1330 journal.close().await.expect("Failed to close journal");
1332
1333 let (blob, blob_size) = context
1335 .open(&cfg.partition, &2u64.to_be_bytes())
1336 .await
1337 .expect("Failed to open blob");
1338 blob.resize(blob_size - 4)
1339 .await
1340 .expect("Failed to corrupt blob");
1341 blob.close().await.expect("Failed to close blob");
1342
1343 let journal = Journal::init(context.clone(), cfg.clone())
1345 .await
1346 .expect("Failed to re-initialize journal");
1347
1348 let mut items = Vec::<(u64, u32)>::new();
1350 {
1351 let stream = journal.replay(1024).await.expect("unable to setup replay");
1352 pin_mut!(stream);
1353 while let Some(result) = stream.next().await {
1354 match result {
1355 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1356 Err(err) => panic!("Failed to read item: {err}"),
1357 }
1358 }
1359 }
1360 journal.close().await.expect("Failed to close journal");
1361
1362 assert_eq!(items.len(), 3);
1364 assert_eq!(items[0].0, 1);
1365 assert_eq!(items[0].1, 1);
1366 assert_eq!(items[1].0, data_items[0].0);
1367 assert_eq!(items[1].1, data_items[0].1);
1368 assert_eq!(items[2].0, data_items[1].0);
1369 assert_eq!(items[2].1, data_items[1].1);
1370
1371 let (_, blob_size) = context
1373 .open(&cfg.partition, &2u64.to_be_bytes())
1374 .await
1375 .expect("Failed to open blob");
1376 assert_eq!(blob_size, 28);
1377
1378 let mut journal = Journal::init(context.clone(), cfg.clone())
1380 .await
1381 .expect("Failed to re-initialize journal");
1382
1383 let mut items = Vec::<(u64, u32)>::new();
1385 {
1386 let stream = journal.replay(1024).await.expect("unable to setup replay");
1387 pin_mut!(stream);
1388 while let Some(result) = stream.next().await {
1389 match result {
1390 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1391 Err(err) => panic!("Failed to read item: {err}"),
1392 }
1393 }
1394 }
1395
1396 assert_eq!(items.len(), 3);
1398 assert_eq!(items[0].0, 1);
1399 assert_eq!(items[0].1, 1);
1400 assert_eq!(items[1].0, data_items[0].0);
1401 assert_eq!(items[1].1, data_items[0].1);
1402 assert_eq!(items[2].0, data_items[1].0);
1403 assert_eq!(items[2].1, data_items[1].1);
1404
1405 journal.append(2, 5).await.expect("Failed to append data");
1407 journal.sync(2).await.expect("Failed to sync blob");
1408
1409 let item = journal
1411 .get(2, 2)
1412 .await
1413 .expect("Failed to get item")
1414 .expect("Failed to get item");
1415 assert_eq!(item, 5);
1416
1417 journal.close().await.expect("Failed to close journal");
1419
1420 let (_, blob_size) = context
1422 .open(&cfg.partition, &2u64.to_be_bytes())
1423 .await
1424 .expect("Failed to open blob");
1425 assert_eq!(blob_size, 44);
1426
1427 let journal = Journal::init(context.clone(), cfg.clone())
1429 .await
1430 .expect("Failed to re-initialize journal");
1431
1432 let mut items = Vec::<(u64, u32)>::new();
1434 {
1435 let stream = journal.replay(1024).await.expect("unable to setup replay");
1436 pin_mut!(stream);
1437 while let Some(result) = stream.next().await {
1438 match result {
1439 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1440 Err(err) => panic!("Failed to read item: {err}"),
1441 }
1442 }
1443 }
1444
1445 assert_eq!(items.len(), 4);
1447 assert_eq!(items[0].0, 1);
1448 assert_eq!(items[0].1, 1);
1449 assert_eq!(items[1].0, data_items[0].0);
1450 assert_eq!(items[1].1, data_items[0].1);
1451 assert_eq!(items[2].0, data_items[1].0);
1452 assert_eq!(items[2].1, data_items[1].1);
1453 assert_eq!(items[3].0, 2);
1454 assert_eq!(items[3].1, 5);
1455 });
1456 }
1457
1458 #[test_traced]
1459 fn test_journal_handling_aligned_truncated_data() {
1460 let executor = deterministic::Runner::default();
1462
1463 executor.start(|context| async move {
1465 let cfg = Config {
1467 partition: "test_partition".into(),
1468 compression: None,
1469 codec_config: (),
1470 write_buffer: 1024,
1471 };
1472
1473 let mut journal = Journal::init(context.clone(), cfg.clone())
1475 .await
1476 .expect("Failed to initialize journal");
1477
1478 journal.append(1, 1).await.expect("Failed to append data");
1480
1481 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1483 for (index, data) in &data_items {
1484 journal
1485 .append(*index, *data)
1486 .await
1487 .expect("Failed to append data");
1488 journal.sync(*index).await.expect("Failed to sync blob");
1489 }
1490
1491 journal.close().await.expect("Failed to close journal");
1493
1494 let (blob, blob_size) = context
1496 .open(&cfg.partition, &2u64.to_be_bytes())
1497 .await
1498 .expect("Failed to open blob");
1499 blob.resize(blob_size - 4)
1500 .await
1501 .expect("Failed to corrupt blob");
1502 blob.close().await.expect("Failed to close blob");
1503
1504 let mut journal = Journal::init(context.clone(), cfg.clone())
1506 .await
1507 .expect("Failed to re-initialize journal");
1508
1509 let mut items = Vec::<(u64, u64)>::new();
1511 {
1512 let stream = journal.replay(1024).await.expect("unable to setup replay");
1513 pin_mut!(stream);
1514 while let Some(result) = stream.next().await {
1515 match result {
1516 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1517 Err(err) => panic!("Failed to read item: {err}"),
1518 }
1519 }
1520 }
1521
1522 assert_eq!(items.len(), 3);
1524 assert_eq!(items[0].0, 1);
1525 assert_eq!(items[0].1, 1);
1526 assert_eq!(items[1].0, data_items[0].0);
1527 assert_eq!(items[1].1, data_items[0].1);
1528 assert_eq!(items[2].0, data_items[1].0);
1529 assert_eq!(items[2].1, data_items[1].1);
1530
1531 journal.append(2, 5).await.expect("Failed to append data");
1533 journal.sync(2).await.expect("Failed to sync blob");
1534
1535 let item = journal
1537 .get(2, 2)
1538 .await
1539 .expect("Failed to get item")
1540 .expect("Failed to get item");
1541 assert_eq!(item, 5);
1542
1543 journal.close().await.expect("Failed to close journal");
1545
1546 let (_, blob_size) = context
1548 .open(&cfg.partition, &2u64.to_be_bytes())
1549 .await
1550 .expect("Failed to open blob");
1551 assert_eq!(blob_size, 48);
1552
1553 let journal = Journal::init(context, cfg)
1555 .await
1556 .expect("Failed to re-initialize journal");
1557
1558 let mut items = Vec::<(u64, u64)>::new();
1560 {
1561 let stream = journal.replay(1024).await.expect("unable to setup replay");
1562 pin_mut!(stream);
1563 while let Some(result) = stream.next().await {
1564 match result {
1565 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1566 Err(err) => panic!("Failed to read item: {err}"),
1567 }
1568 }
1569 }
1570 journal.close().await.expect("Failed to close journal");
1571
1572 assert_eq!(items.len(), 4);
1574 assert_eq!(items[0].0, 1);
1575 assert_eq!(items[0].1, 1);
1576 assert_eq!(items[1].0, data_items[0].0);
1577 assert_eq!(items[1].1, data_items[0].1);
1578 assert_eq!(items[2].0, data_items[1].0);
1579 assert_eq!(items[2].1, data_items[1].1);
1580 assert_eq!(items[3].0, 2);
1581 assert_eq!(items[3].1, 5);
1582 });
1583 }
1584
1585 #[test_traced]
1586 fn test_journal_handling_extra_data() {
1587 let executor = deterministic::Runner::default();
1589
1590 executor.start(|context| async move {
1592 let cfg = Config {
1594 partition: "test_partition".into(),
1595 compression: None,
1596 codec_config: (),
1597 write_buffer: 1024,
1598 };
1599
1600 let mut journal = Journal::init(context.clone(), cfg.clone())
1602 .await
1603 .expect("Failed to initialize journal");
1604
1605 journal.append(1, 1).await.expect("Failed to append data");
1607
1608 let data_items = vec![(2u64, 2), (2u64, 3), (2u64, 4)];
1610 for (index, data) in &data_items {
1611 journal
1612 .append(*index, *data)
1613 .await
1614 .expect("Failed to append data");
1615 journal.sync(*index).await.expect("Failed to sync blob");
1616 }
1617
1618 journal.close().await.expect("Failed to close journal");
1620
1621 let (blob, blob_size) = context
1623 .open(&cfg.partition, &2u64.to_be_bytes())
1624 .await
1625 .expect("Failed to open blob");
1626 blob.write_at(vec![0u8; 16], blob_size)
1627 .await
1628 .expect("Failed to add extra data");
1629 blob.close().await.expect("Failed to close blob");
1630
1631 let journal = Journal::init(context, cfg)
1633 .await
1634 .expect("Failed to re-initialize journal");
1635
1636 let mut items = Vec::<(u64, i32)>::new();
1638 let stream = journal.replay(1024).await.expect("unable to setup replay");
1639 pin_mut!(stream);
1640 while let Some(result) = stream.next().await {
1641 match result {
1642 Ok((blob_index, _, _, item)) => items.push((blob_index, item)),
1643 Err(err) => panic!("Failed to read item: {err}"),
1644 }
1645 }
1646 });
1647 }
1648
1649 #[derive(Clone)]
1651 struct MockBlob {}
1652
1653 impl Blob for MockBlob {
1654 async fn read_at(
1655 &self,
1656 buf: impl Into<StableBuf> + Send,
1657 _offset: u64,
1658 ) -> Result<StableBuf, RError> {
1659 Ok(buf.into())
1660 }
1661
1662 async fn write_at(
1663 &self,
1664 _buf: impl Into<StableBuf> + Send,
1665 _offset: u64,
1666 ) -> Result<(), RError> {
1667 Ok(())
1668 }
1669
1670 async fn resize(&self, _len: u64) -> Result<(), RError> {
1671 Ok(())
1672 }
1673
1674 async fn sync(&self) -> Result<(), RError> {
1675 Ok(())
1676 }
1677
1678 async fn close(self) -> Result<(), RError> {
1679 Ok(())
1680 }
1681 }
1682
1683 #[derive(Clone)]
1685 struct MockStorage {
1686 len: u64,
1687 }
1688
1689 impl Storage for MockStorage {
1690 type Blob = MockBlob;
1691
1692 async fn open(&self, _partition: &str, _name: &[u8]) -> Result<(MockBlob, u64), RError> {
1693 Ok((MockBlob {}, self.len))
1694 }
1695
1696 async fn remove(&self, _partition: &str, _name: Option<&[u8]>) -> Result<(), RError> {
1697 Ok(())
1698 }
1699
1700 async fn scan(&self, _partition: &str) -> Result<Vec<Vec<u8>>, RError> {
1701 Ok(vec![])
1702 }
1703 }
1704
1705 impl Metrics for MockStorage {
1706 fn with_label(&self, _: &str) -> Self {
1707 self.clone()
1708 }
1709
1710 fn label(&self) -> String {
1711 String::new()
1712 }
1713
1714 fn register<N: Into<String>, H: Into<String>>(&self, _: N, _: H, _: impl Metric) {}
1715
1716 fn encode(&self) -> String {
1717 String::new()
1718 }
1719 }
1720
1721 const INDEX_ALIGNMENT: u64 = 16;
1724
1725 #[test_traced]
1726 fn test_journal_large_offset() {
1727 let executor = deterministic::Runner::default();
1729 executor.start(|_| async move {
1730 let cfg = Config {
1732 partition: "partition".to_string(),
1733 compression: None,
1734 codec_config: (),
1735 write_buffer: 1024,
1736 };
1737 let context = MockStorage {
1738 len: u32::MAX as u64 * INDEX_ALIGNMENT, };
1740 let mut journal = Journal::init(context, cfg).await.unwrap();
1741
1742 let data = 1;
1744 let (result, _) = journal
1745 .append(1, data)
1746 .await
1747 .expect("Failed to append data");
1748 assert_eq!(result, u32::MAX);
1749 });
1750 }
1751
1752 #[test_traced]
1753 fn test_journal_offset_overflow() {
1754 let executor = deterministic::Runner::default();
1756 executor.start(|_| async move {
1757 let cfg = Config {
1759 partition: "partition".to_string(),
1760 compression: None,
1761 codec_config: (),
1762 write_buffer: 1024,
1763 };
1764 let context = MockStorage {
1765 len: u32::MAX as u64 * INDEX_ALIGNMENT + 1,
1766 };
1767 let mut journal = Journal::init(context, cfg).await.unwrap();
1768
1769 let data = 1;
1771 let result = journal.append(1, data).await;
1772 assert!(matches!(result, Err(Error::OffsetOverflow)));
1773 });
1774 }
1775
1776 #[test_traced]
1777 fn test_journal_rewind() {
1778 let executor = deterministic::Runner::default();
1780 executor.start(|context| async move {
1781 let cfg = Config {
1783 partition: "test_partition".to_string(),
1784 compression: None,
1785 codec_config: (),
1786 write_buffer: 1024,
1787 };
1788 let mut journal = Journal::init(context, cfg).await.unwrap();
1789
1790 let size = journal.size(1).await.unwrap();
1792 assert_eq!(size, 0);
1793
1794 journal.append(1, 42i32).await.unwrap();
1796
1797 let size = journal.size(1).await.unwrap();
1799 assert!(size > 0);
1800
1801 journal.append(1, 43i32).await.unwrap();
1803 let new_size = journal.size(1).await.unwrap();
1804 assert!(new_size > size);
1805
1806 let size = journal.size(2).await.unwrap();
1808 assert_eq!(size, 0);
1809
1810 journal.append(2, 44i32).await.unwrap();
1812
1813 let size = journal.size(2).await.unwrap();
1815 assert!(size > 0);
1816
1817 journal.rewind(1, 0).await.unwrap();
1819
1820 let size = journal.size(1).await.unwrap();
1822 assert_eq!(size, 0);
1823
1824 let size = journal.size(2).await.unwrap();
1826 assert_eq!(size, 0);
1827 });
1828 }
1829
1830 #[test_traced]
1831 fn test_journal_rewind_section() {
1832 let executor = deterministic::Runner::default();
1834 executor.start(|context| async move {
1835 let cfg = Config {
1837 partition: "test_partition".to_string(),
1838 compression: None,
1839 codec_config: (),
1840 write_buffer: 1024,
1841 };
1842 let mut journal = Journal::init(context, cfg).await.unwrap();
1843
1844 let size = journal.size(1).await.unwrap();
1846 assert_eq!(size, 0);
1847
1848 journal.append(1, 42i32).await.unwrap();
1850
1851 let size = journal.size(1).await.unwrap();
1853 assert!(size > 0);
1854
1855 journal.append(1, 43i32).await.unwrap();
1857 let new_size = journal.size(1).await.unwrap();
1858 assert!(new_size > size);
1859
1860 let size = journal.size(2).await.unwrap();
1862 assert_eq!(size, 0);
1863
1864 journal.append(2, 44i32).await.unwrap();
1866
1867 let size = journal.size(2).await.unwrap();
1869 assert!(size > 0);
1870
1871 journal.rewind_section(1, 0).await.unwrap();
1873
1874 let size = journal.size(1).await.unwrap();
1876 assert_eq!(size, 0);
1877
1878 let size = journal.size(2).await.unwrap();
1880 assert!(size > 0);
1881 });
1882 }
1883
1884 #[test_traced]
1886 fn test_journal_conformance() {
1887 let executor = deterministic::Runner::default();
1889
1890 executor.start(|context| async move {
1892 let cfg = Config {
1894 partition: "test_partition".into(),
1895 compression: None,
1896 codec_config: (),
1897 write_buffer: 1024,
1898 };
1899
1900 let mut journal = Journal::init(context.clone(), cfg.clone())
1902 .await
1903 .expect("Failed to initialize journal");
1904
1905 for i in 0..100 {
1907 journal.append(1, i).await.expect("Failed to append data");
1908 }
1909 journal.sync(1).await.expect("Failed to sync blob");
1910
1911 journal.close().await.expect("Failed to close journal");
1913
1914 let (blob, size) = context
1916 .open(&cfg.partition, &1u64.to_be_bytes())
1917 .await
1918 .expect("Failed to open blob");
1919 assert!(size > 0);
1920 let buf = blob
1921 .read_at(vec![0u8; size as usize], 0)
1922 .await
1923 .expect("Failed to read blob");
1924 let digest = hash(buf.as_ref());
1925 assert_eq!(
1926 hex(&digest),
1927 "ca3845fa7fabd4d2855ab72ed21226d1d6eb30cb895ea9ec5e5a14201f3f25d8",
1928 );
1929 });
1930 }
1931}