1use crate::journal::{
59 contiguous::{MutableContiguous, PersistableContiguous},
60 Error,
61};
62use bytes::BufMut;
63use commonware_codec::{CodecFixed, DecodeExt as _, FixedSize};
64use commonware_runtime::{
65 buffer::{Append, PoolRef, Read},
66 telemetry::metrics::status::GaugeExt,
67 Blob, Error as RError, Metrics, Storage,
68};
69use commonware_utils::hex;
70use futures::{
71 future::try_join_all,
72 stream::{self, Stream},
73 StreamExt,
74};
75use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
76use std::{
77 collections::BTreeMap,
78 marker::PhantomData,
79 num::{NonZeroU64, NonZeroUsize},
80};
81use tracing::{debug, trace, warn};
82
83#[derive(Clone)]
85pub struct Config {
86 pub partition: String,
88
89 pub items_per_blob: NonZeroU64,
94
95 pub buffer_pool: PoolRef,
97
98 pub write_buffer: NonZeroUsize,
100}
101
102pub struct Journal<E: Storage + Metrics, A: CodecFixed> {
104 pub(crate) context: E,
105 pub(crate) cfg: Config,
106
107 pub(crate) blobs: BTreeMap<u64, Append<E::Blob>>,
115
116 pub(crate) tail: Append<E::Blob>,
122
123 pub(crate) tail_index: u64,
125
126 pub(crate) size: u64,
128
129 pub(crate) pruning_boundary: u64,
131
132 pub(crate) tracked: Gauge,
133 pub(crate) synced: Counter,
134 pub(crate) pruned: Counter,
135
136 pub(crate) _array: PhantomData<A>,
137}
138
139impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> Journal<E, A> {
140 pub(crate) const CHUNK_SIZE: usize = u32::SIZE + A::SIZE;
141 pub(crate) const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
142
143 pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
155 let mut blobs = BTreeMap::new();
157 let stored_blobs = match context.scan(&cfg.partition).await {
158 Ok(blobs) => blobs,
159 Err(RError::PartitionMissing(_)) => Vec::new(),
160 Err(err) => return Err(Error::Runtime(err)),
161 };
162 for name in stored_blobs {
163 let (blob, size) = context
164 .open(&cfg.partition, &name)
165 .await
166 .map_err(Error::Runtime)?;
167 let index = match name.try_into() {
168 Ok(index) => u64::from_be_bytes(index),
169 Err(nm) => return Err(Error::InvalidBlobName(hex(&nm))),
170 };
171 debug!(blob = index, size, "loaded blob");
172 blobs.insert(index, (blob, size));
173 }
174
175 let full_size = cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64;
177 if !blobs.is_empty() {
178 let mut it = blobs.keys().rev();
179 let mut prev_index = *it.next().unwrap();
180 for index in it {
181 let (_, size) = blobs.get(index).unwrap();
182 if *index != prev_index - 1 {
183 return Err(Error::MissingBlob(prev_index - 1));
184 }
185 prev_index = *index;
186 if *size != full_size {
187 return Err(Error::InvalidBlobSize(*index, *size));
189 }
190 }
191 } else {
192 debug!("no blobs found");
193 let (blob, size) = context.open(&cfg.partition, &0u64.to_be_bytes()).await?;
194 assert_eq!(size, 0);
195 blobs.insert(0, (blob, size));
196 }
197
198 let tracked = Gauge::default();
200 let synced = Counter::default();
201 let pruned = Counter::default();
202 context.register("tracked", "Number of blobs", tracked.clone());
203 context.register("synced", "Number of syncs", synced.clone());
204 context.register("pruned", "Number of blobs pruned", pruned.clone());
205 let _ = tracked.try_set(blobs.len());
206
207 let (mut tail_index, (mut tail, mut tail_size)) = blobs.pop_last().unwrap();
209
210 tail_size = Self::trim_tail(&tail, tail_size, tail_index).await?;
212 if tail_size > full_size {
213 return Err(Error::InvalidBlobSize(tail_index, tail_size));
214 }
215
216 if tail_size == full_size {
219 warn!(
220 blob = tail_index,
221 "tail blob is full, creating a new empty one"
222 );
223 blobs.insert(tail_index, (tail, tail_size));
224 tail_index += 1;
225 (tail, tail_size) = context
226 .open(&cfg.partition, &tail_index.to_be_bytes())
227 .await?;
228 assert_eq!(tail_size, 0);
229 tracked.inc();
230 }
231
232 let blobs = try_join_all(blobs.into_iter().map(|(index, (blob, size))| {
236 let pool = cfg.buffer_pool.clone();
237 async move {
238 let blob = Append::new(blob, size, cfg.write_buffer, pool).await?;
239 Ok::<_, Error>((index, (blob, size)))
240 }
241 }))
242 .await?;
243 let tail = Append::new(tail, tail_size, cfg.write_buffer, cfg.buffer_pool.clone()).await?;
244 let size = tail_index * cfg.items_per_blob.get() + (tail_size / Self::CHUNK_SIZE_U64);
245 let pruning_boundary = if blobs.is_empty() {
246 tail_index * cfg.items_per_blob.get()
247 } else {
248 blobs[0].0 * cfg.items_per_blob.get()
249 };
250 assert!(size >= pruning_boundary);
251
252 Ok(Self {
253 context,
254 cfg,
255 blobs: blobs
256 .into_iter()
257 .map(|(index, (blob, _))| (index, blob))
258 .collect(),
259 tail,
260 tail_index,
261 size,
262 pruning_boundary,
263 tracked,
264 synced,
265 pruned,
266 _array: PhantomData,
267 })
268 }
269
270 async fn trim_tail(
274 tail: &<E as Storage>::Blob,
275 mut tail_size: u64,
276 tail_index: u64,
277 ) -> Result<u64, Error> {
278 let mut truncated = false;
279 if !tail_size.is_multiple_of(Self::CHUNK_SIZE_U64) {
280 warn!(
281 blob = tail_index,
282 invalid_size = tail_size,
283 "last blob size is not a multiple of item size, truncating"
284 );
285 tail_size -= tail_size % Self::CHUNK_SIZE_U64;
286 tail.resize(tail_size).await?;
287 truncated = true;
288 }
289
290 while tail_size > 0 {
293 let offset = tail_size - Self::CHUNK_SIZE_U64;
294 let read = tail.read_at(vec![0u8; Self::CHUNK_SIZE], offset).await?;
295 match Self::verify_integrity(read.as_ref()) {
296 Ok(_) => break, Err(Error::ChecksumMismatch(_, _)) => {
298 warn!(blob = tail_index, offset, "checksum mismatch: truncating",);
299 tail_size -= Self::CHUNK_SIZE_U64;
300 tail.resize(tail_size).await?;
301 truncated = true;
302 }
303 Err(err) => return Err(err),
304 }
305 }
306
307 if truncated {
309 tail.sync().await?;
310 }
311
312 Ok(tail_size)
313 }
314
315 pub async fn sync(&mut self) -> Result<(), Error> {
317 self.synced.inc();
318 debug!(blob = self.tail_index, "syncing blob");
319 self.tail.sync().await.map_err(Error::Runtime)
320 }
321
322 pub const fn size(&self) -> u64 {
325 self.size
326 }
327
328 pub async fn append(&mut self, item: A) -> Result<u64, Error> {
331 let mut size = self.tail.size().await;
333 assert!(size < self.cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64);
334 assert_eq!(size % Self::CHUNK_SIZE_U64, 0);
335 let mut buf: Vec<u8> = Vec::with_capacity(Self::CHUNK_SIZE);
336 let item = item.encode();
337 let checksum = crc32fast::hash(&item);
338 buf.extend_from_slice(&item);
339 buf.put_u32(checksum);
340
341 let item_pos =
343 (size / Self::CHUNK_SIZE_U64) + self.cfg.items_per_blob.get() * self.tail_index;
344 self.tail.append(buf).await?;
345 trace!(blob = self.tail_index, pos = item_pos, "appended item");
346 size += Self::CHUNK_SIZE_U64;
347
348 if size == self.cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64 {
351 self.tail.sync().await?;
354
355 let next_blob_index = self.tail_index + 1;
357 debug!(blob = next_blob_index, "creating next blob");
358 let (next_blob, size) = self
359 .context
360 .open(&self.cfg.partition, &next_blob_index.to_be_bytes())
361 .await?;
362 assert_eq!(size, 0);
363 let next_blob = Append::new(
364 next_blob,
365 size,
366 self.cfg.write_buffer,
367 self.cfg.buffer_pool.clone(),
368 )
369 .await?;
370 self.tracked.inc();
371
372 let old_tail = std::mem::replace(&mut self.tail, next_blob);
374 assert!(self.blobs.insert(self.tail_index, old_tail).is_none());
375 self.tail_index = next_blob_index;
376 }
377 self.size += 1;
378
379 Ok(item_pos)
380 }
381
382 pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
391 match size.cmp(&self.size()) {
392 std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
393 std::cmp::Ordering::Equal => return Ok(()),
394 std::cmp::Ordering::Less => {}
395 }
396 let rewind_to_blob_index = size / self.cfg.items_per_blob;
397 if rewind_to_blob_index < self.oldest_blob_index() {
398 return Err(Error::InvalidRewind(size));
399 }
400 let rewind_to_offset = (size % self.cfg.items_per_blob) * Self::CHUNK_SIZE_U64;
401
402 while rewind_to_blob_index < self.tail_index {
405 let (blob_index, mut new_tail) = self.blobs.pop_last().unwrap();
406 assert_eq!(blob_index, self.tail_index - 1);
407 std::mem::swap(&mut self.tail, &mut new_tail);
408 self.remove_blob(self.tail_index, new_tail).await?;
409 self.tail_index -= 1;
410 }
411
412 self.tail.resize(rewind_to_offset).await?;
414
415 self.size = size;
416 assert!(size >= self.pruning_boundary);
417
418 Ok(())
419 }
420
421 pub const fn oldest_retained_pos(&self) -> Option<u64> {
425 if self.pruning_boundary == self.size {
426 return None;
427 }
428
429 Some(self.pruning_boundary)
430 }
431
432 pub const fn pruning_boundary(&self) -> u64 {
434 self.pruning_boundary
435 }
436
437 pub async fn read(&self, pos: u64) -> Result<A, Error> {
444 let blob_index = pos / self.cfg.items_per_blob.get();
445 if blob_index > self.tail_index {
446 return Err(Error::ItemOutOfRange(pos));
447 }
448
449 let offset = (pos % self.cfg.items_per_blob.get()) * Self::CHUNK_SIZE_U64;
450
451 let blob = if blob_index == self.tail_index {
452 if offset >= self.tail.size().await {
453 return Err(Error::ItemOutOfRange(pos));
454 }
455 &self.tail
456 } else {
457 self.blobs.get(&blob_index).ok_or(Error::ItemPruned(pos))?
458 };
459
460 let read = blob.read_at(vec![0u8; Self::CHUNK_SIZE], offset).await?;
461 Self::verify_integrity(read.as_ref())
462 }
463
464 fn verify_integrity(buf: &[u8]) -> Result<A, Error> {
471 let stored_checksum = u32::from_be_bytes(buf[A::SIZE..].try_into().unwrap());
472 let checksum = crc32fast::hash(&buf[..A::SIZE]);
473 if checksum != stored_checksum {
474 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
475 }
476 A::decode(&buf[..A::SIZE]).map_err(Error::Codec)
477 }
478
479 pub async fn replay(
489 &self,
490 buffer: NonZeroUsize,
491 start_pos: u64,
492 ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + '_, Error> {
493 assert!(start_pos <= self.size());
494
495 let items_per_blob = self.cfg.items_per_blob.get();
497 let start_blob = start_pos / items_per_blob;
498 assert!(start_blob <= self.tail_index);
499 let blobs = self.blobs.range(start_blob..).collect::<Vec<_>>();
500 let full_size = items_per_blob * Self::CHUNK_SIZE_U64;
501 let mut blob_plus = blobs
502 .into_iter()
503 .map(|(blob_index, blob)| (*blob_index, blob.clone_blob(), full_size))
504 .collect::<Vec<_>>();
505
506 self.tail.sync().await?; let tail_size = self.tail.size().await;
509 blob_plus.push((self.tail_index, self.tail.clone_blob(), tail_size));
510 let start_offset = (start_pos % items_per_blob) * Self::CHUNK_SIZE_U64;
511
512 let stream = stream::iter(blob_plus).flat_map(move |(blob_index, blob, size)| {
515 let mut reader = Read::new(blob, size, buffer);
518 let buf = vec![0u8; Self::CHUNK_SIZE];
519 let initial_offset = if blob_index == start_blob {
520 reader.seek_to(start_offset).expect("invalid start_pos");
522 start_offset
523 } else {
524 0
525 };
526
527 stream::unfold(
528 (buf, reader, initial_offset),
529 move |(mut buf, mut reader, offset)| async move {
530 if offset >= reader.blob_size() {
531 return None;
532 }
533
534 let item_pos = items_per_blob * blob_index + offset / Self::CHUNK_SIZE_U64;
537 match reader.read_exact(&mut buf, Self::CHUNK_SIZE).await {
538 Ok(()) => {
539 let next_offset = offset + Self::CHUNK_SIZE_U64;
540 let result = Self::verify_integrity(&buf).map(|item| (item_pos, item));
541 if result.is_err() {
542 warn!("corrupted item at {item_pos}");
543 }
544 Some((result, (buf, reader, next_offset)))
545 }
546 Err(err) => {
547 warn!(
548 item_pos,
549 err = err.to_string(),
550 "error reading item during replay"
551 );
552 Some((Err(Error::Runtime(err)), (buf, reader, size)))
553 }
554 }
555 },
556 )
557 });
558
559 Ok(stream)
560 }
561
562 fn oldest_blob_index(&self) -> u64 {
564 if self.blobs.is_empty() {
565 self.tail_index
566 } else {
567 *self.blobs.first_key_value().unwrap().0
568 }
569 }
570
571 pub async fn prune(&mut self, min_item_pos: u64) -> Result<bool, Error> {
578 let oldest_blob_index = self.oldest_blob_index();
579 let new_oldest_blob =
580 std::cmp::min(min_item_pos / self.cfg.items_per_blob, self.tail_index);
581
582 let mut pruned = false;
583 for index in oldest_blob_index..new_oldest_blob {
584 pruned = true;
585 let blob = self.blobs.remove(&index).unwrap();
586 self.remove_blob(index, blob).await?;
587 self.pruned.inc();
588 }
589 if pruned {
590 self.pruning_boundary = new_oldest_blob * self.cfg.items_per_blob.get();
591 }
592
593 Ok(pruned)
594 }
595
596 async fn remove_blob(&mut self, index: u64, blob: Append<E::Blob>) -> Result<(), Error> {
598 drop(blob);
599 self.context
600 .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
601 .await?;
602 debug!(blob = index, "removed blob");
603 self.tracked.dec();
604
605 Ok(())
606 }
607
608 pub async fn close(self) -> Result<(), Error> {
610 for (i, blob) in self.blobs.into_iter() {
611 blob.sync().await?;
612 debug!(blob = i, "synced blob");
613 }
614 self.tail.sync().await?;
615 debug!(blob = self.tail_index, "synced tail");
616
617 Ok(())
618 }
619
620 pub async fn destroy(self) -> Result<(), Error> {
622 for (i, blob) in self.blobs.into_iter() {
623 drop(blob);
624 debug!(blob = i, "destroyed blob");
625 self.context
626 .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
627 .await?;
628 }
629
630 drop(self.tail);
631 debug!(blob = self.tail_index, "destroyed blob");
632 self.context
633 .remove(&self.cfg.partition, Some(&self.tail_index.to_be_bytes()))
634 .await?;
635
636 match self.context.remove(&self.cfg.partition, None).await {
637 Ok(()) => {}
638 Err(RError::PartitionMissing(_)) => {
639 }
641 Err(err) => return Err(Error::Runtime(err)),
642 }
643
644 Ok(())
645 }
646}
647
648impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> super::Contiguous for Journal<E, A> {
650 type Item = A;
651
652 fn size(&self) -> u64 {
653 Self::size(self)
654 }
655
656 fn oldest_retained_pos(&self) -> Option<u64> {
657 Self::oldest_retained_pos(self)
658 }
659
660 fn pruning_boundary(&self) -> u64 {
661 Self::pruning_boundary(self)
662 }
663
664 async fn replay(
665 &self,
666 start_pos: u64,
667 buffer: NonZeroUsize,
668 ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error> {
669 Self::replay(self, buffer, start_pos).await
670 }
671
672 async fn read(&self, position: u64) -> Result<Self::Item, Error> {
673 Self::read(self, position).await
674 }
675}
676
677impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> MutableContiguous for Journal<E, A> {
678 async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {
679 Self::append(self, item).await
680 }
681
682 async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
683 Self::prune(self, min_position).await
684 }
685
686 async fn rewind(&mut self, size: u64) -> Result<(), Error> {
687 Self::rewind(self, size).await
688 }
689}
690
691impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> PersistableContiguous for Journal<E, A> {
692 async fn commit(&mut self) -> Result<(), Error> {
693 Self::sync(self).await
694 }
695
696 async fn sync(&mut self) -> Result<(), Error> {
697 Self::sync(self).await
698 }
699
700 async fn close(self) -> Result<(), Error> {
701 Self::close(self).await
702 }
703
704 async fn destroy(self) -> Result<(), Error> {
705 Self::destroy(self).await
706 }
707}
708
709#[cfg(test)]
710mod tests {
711 use super::*;
712 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
713 use commonware_macros::test_traced;
714 use commonware_runtime::{
715 deterministic::{self, Context},
716 Blob, Runner, Storage,
717 };
718 use commonware_utils::{NZUsize, NZU64};
719 use futures::{pin_mut, StreamExt};
720
721 const PAGE_SIZE: NonZeroUsize = NZUsize!(44);
722 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
723
724 fn test_digest(value: u64) -> Digest {
726 Sha256::hash(&value.to_be_bytes())
727 }
728
729 fn test_cfg(items_per_blob: NonZeroU64) -> Config {
730 Config {
731 partition: "test_partition".into(),
732 items_per_blob,
733 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
734 write_buffer: NZUsize!(2048),
735 }
736 }
737
738 #[test_traced]
739 fn test_fixed_journal_append_and_prune() {
740 let executor = deterministic::Runner::default();
742
743 executor.start(|context| async move {
745 let cfg = test_cfg(NZU64!(2));
747 let mut journal = Journal::init(context.clone(), cfg.clone())
748 .await
749 .expect("failed to initialize journal");
750
751 let buffer = context.encode();
752 assert!(buffer.contains("tracked 1"));
753
754 let mut pos = journal
756 .append(test_digest(0))
757 .await
758 .expect("failed to append data 0");
759 assert_eq!(pos, 0);
760
761 journal.close().await.expect("Failed to close journal");
763
764 let cfg = test_cfg(NZU64!(2));
766 let mut journal = Journal::init(context.clone(), cfg.clone())
767 .await
768 .expect("failed to re-initialize journal");
769 assert_eq!(journal.size(), 1);
770
771 pos = journal
773 .append(test_digest(1))
774 .await
775 .expect("failed to append data 1");
776 assert_eq!(pos, 1);
777 pos = journal
778 .append(test_digest(2))
779 .await
780 .expect("failed to append data 2");
781 assert_eq!(pos, 2);
782 let buffer = context.encode();
783 assert!(buffer.contains("tracked 2"));
784
785 let item0 = journal.read(0).await.expect("failed to read data 0");
787 assert_eq!(item0, test_digest(0));
788 let item1 = journal.read(1).await.expect("failed to read data 1");
789 assert_eq!(item1, test_digest(1));
790 let item2 = journal.read(2).await.expect("failed to read data 2");
791 assert_eq!(item2, test_digest(2));
792 let err = journal.read(3).await.expect_err("expected read to fail");
793 assert!(matches!(err, Error::ItemOutOfRange(3)));
794
795 journal.sync().await.expect("failed to sync journal");
797 let buffer = context.encode();
798 assert!(buffer.contains("synced_total 1"));
799
800 journal.prune(1).await.expect("failed to prune journal 1");
802 let buffer = context.encode();
803 assert!(buffer.contains("tracked 2"));
804
805 journal.prune(2).await.expect("failed to prune journal 2");
807 assert_eq!(journal.oldest_retained_pos(), Some(2));
808 let buffer = context.encode();
809 assert!(buffer.contains("tracked 1"));
810 assert!(buffer.contains("pruned_total 1"));
811
812 let result0 = journal.read(0).await;
814 assert!(matches!(result0, Err(Error::ItemPruned(0))));
815 let result1 = journal.read(1).await;
816 assert!(matches!(result1, Err(Error::ItemPruned(1))));
817
818 let result2 = journal.read(2).await.unwrap();
820 assert_eq!(result2, test_digest(2));
821
822 for i in 3..10 {
824 let pos = journal
825 .append(test_digest(i))
826 .await
827 .expect("failed to append data");
828 assert_eq!(pos, i);
829 }
830
831 journal.prune(0).await.expect("no-op pruning failed");
833 assert_eq!(journal.oldest_blob_index(), 1);
834 assert_eq!(journal.tail_index, 5);
835 assert_eq!(journal.oldest_retained_pos(), Some(2));
836
837 journal
839 .prune(3 * cfg.items_per_blob.get())
840 .await
841 .expect("failed to prune journal 2");
842 assert_eq!(journal.oldest_retained_pos(), Some(6));
843 let buffer = context.encode();
844 assert_eq!(journal.oldest_blob_index(), 3);
845 assert_eq!(journal.tail_index, 5);
846 assert!(buffer.contains("tracked 3"));
847 assert!(buffer.contains("pruned_total 3"));
848
849 journal
851 .prune(10000)
852 .await
853 .expect("failed to max-prune journal");
854 let buffer = context.encode();
855 let size = journal.size();
856 assert_eq!(size, 10);
857 assert_eq!(journal.oldest_blob_index(), 5);
858 assert_eq!(journal.tail_index, 5);
859 assert!(buffer.contains("tracked 1"));
860 assert!(buffer.contains("pruned_total 5"));
861 assert_eq!(journal.oldest_retained_pos(), None);
864 assert_eq!(journal.pruning_boundary(), size);
866
867 {
868 let stream = journal
869 .replay(NZUsize!(1024), 0)
870 .await
871 .expect("failed to replay journal");
872 pin_mut!(stream);
873 let mut items = Vec::new();
874 while let Some(result) = stream.next().await {
875 match result {
876 Ok((pos, item)) => {
877 assert_eq!(test_digest(pos), item);
878 items.push(pos);
879 }
880 Err(err) => panic!("Failed to read item: {err}"),
881 }
882 }
883 assert_eq!(items, Vec::<u64>::new());
884 }
885
886 journal.destroy().await.unwrap();
887 });
888 }
889
890 #[test_traced]
892 fn test_fixed_journal_append_a_lot_of_data() {
893 let executor = deterministic::Runner::default();
895 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10000);
896 executor.start(|context| async move {
897 let cfg = test_cfg(ITEMS_PER_BLOB);
898 let mut journal = Journal::init(context.clone(), cfg.clone())
899 .await
900 .expect("failed to initialize journal");
901 for i in 0u64..ITEMS_PER_BLOB.get() * 2 - 1 {
903 journal
904 .append(test_digest(i))
905 .await
906 .expect("failed to append data");
907 }
908 journal.close().await.expect("failed to close journal");
910 let journal = Journal::init(context.clone(), cfg.clone())
911 .await
912 .expect("failed to re-initialize journal");
913 for i in 0u64..10000 {
914 let item: Digest = journal.read(i).await.expect("failed to read data");
915 assert_eq!(item, test_digest(i));
916 }
917 journal.destroy().await.expect("failed to destroy journal");
918 });
919 }
920
921 #[test_traced]
922 fn test_fixed_journal_replay() {
923 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
924 let executor = deterministic::Runner::default();
926
927 executor.start(|context| async move {
929 let cfg = test_cfg(ITEMS_PER_BLOB);
931 let mut journal = Journal::init(context.clone(), cfg.clone())
932 .await
933 .expect("failed to initialize journal");
934
935 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
937 let pos = journal
938 .append(test_digest(i))
939 .await
940 .expect("failed to append data");
941 assert_eq!(pos, i);
942 }
943
944 let buffer = context.encode();
945 assert!(buffer.contains("tracked 101"));
946
947 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
949 let item: Digest = journal.read(i).await.expect("failed to read data");
950 assert_eq!(item, test_digest(i), "i={i}");
951 }
952
953 {
955 let stream = journal
956 .replay(NZUsize!(1024), 0)
957 .await
958 .expect("failed to replay journal");
959 let mut items = Vec::new();
960 pin_mut!(stream);
961 while let Some(result) = stream.next().await {
962 match result {
963 Ok((pos, item)) => {
964 assert_eq!(test_digest(pos), item, "pos={pos}, item={item:?}");
965 items.push(pos);
966 }
967 Err(err) => panic!("Failed to read item: {err}"),
968 }
969 }
970
971 assert_eq!(
973 items.len(),
974 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
975 );
976 items.sort();
977 for (i, pos) in items.iter().enumerate() {
978 assert_eq!(i as u64, *pos);
979 }
980 }
981 journal.close().await.expect("Failed to close journal");
982
983 let checksum_offset = Digest::SIZE as u64
985 + (ITEMS_PER_BLOB.get() / 2) * (Digest::SIZE + u32::SIZE) as u64;
986 let (blob, _) = context
987 .open(&cfg.partition, &40u64.to_be_bytes())
988 .await
989 .expect("Failed to open blob");
990 let bad_checksum = 123456789u32;
992 blob.write_at(bad_checksum.to_be_bytes().to_vec(), checksum_offset)
993 .await
994 .expect("Failed to write incorrect checksum");
995 let corrupted_item_pos = 40 * ITEMS_PER_BLOB.get() + ITEMS_PER_BLOB.get() / 2;
996 blob.sync().await.expect("Failed to sync blob");
997
998 let journal = Journal::init(context.clone(), cfg.clone())
1000 .await
1001 .expect("Failed to re-initialize journal");
1002
1003 let err = journal.read(corrupted_item_pos).await.unwrap_err();
1005 assert!(matches!(err, Error::ChecksumMismatch(x, _) if x == bad_checksum));
1006
1007 {
1009 let stream = journal
1010 .replay(NZUsize!(1024), 0)
1011 .await
1012 .expect("failed to replay journal");
1013 let mut items = Vec::new();
1014 pin_mut!(stream);
1015 let mut error_count = 0;
1016 while let Some(result) = stream.next().await {
1017 match result {
1018 Ok((pos, item)) => {
1019 assert_eq!(test_digest(pos), item);
1020 items.push(pos);
1021 }
1022 Err(err) => {
1023 error_count += 1;
1024 assert!(matches!(err, Error::ChecksumMismatch(_, _)));
1025 }
1026 }
1027 }
1028 assert_eq!(error_count, 1);
1029 assert_eq!(
1031 items.len(),
1032 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2 - 1
1033 );
1034 }
1035 journal.close().await.expect("Failed to close journal");
1036 });
1037 }
1038
1039 #[test_traced]
1040 fn test_fixed_journal_init_with_corrupted_historical_blobs() {
1041 let executor = deterministic::Runner::default();
1043 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1045 executor.start(|context| async move {
1046 let cfg = test_cfg(ITEMS_PER_BLOB);
1048 let mut journal = Journal::init(context.clone(), cfg.clone())
1049 .await
1050 .expect("failed to initialize journal");
1051
1052 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1054 let pos = journal
1055 .append(test_digest(i))
1056 .await
1057 .expect("failed to append data");
1058 assert_eq!(pos, i);
1059 }
1060 journal.close().await.expect("Failed to close journal");
1061
1062 let buffer = context.encode();
1063 assert!(buffer.contains("tracked 101"));
1064
1065 let (blob, size) = context
1067 .open(&cfg.partition, &40u64.to_be_bytes())
1068 .await
1069 .expect("Failed to open blob");
1070 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1071 blob.sync().await.expect("Failed to sync blob");
1072 let result = Journal::<_, Digest>::init(context.clone(), cfg.clone()).await;
1073 assert!(matches!(
1074 result.err().unwrap(),
1075 Error::InvalidBlobSize(_, _)
1076 ));
1077
1078 context
1080 .remove(&cfg.partition, Some(&40u64.to_be_bytes()))
1081 .await
1082 .expect("Failed to remove blob");
1083 let result = Journal::<_, Digest>::init(context.clone(), cfg.clone()).await;
1084 assert!(matches!(result.err().unwrap(), Error::MissingBlob(n) if n == 40));
1085 });
1086 }
1087
1088 #[test_traced]
1089 fn test_fixed_journal_test_trim_blob() {
1090 let executor = deterministic::Runner::default();
1092 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1094 executor.start(|context| async move {
1095 let cfg = test_cfg(ITEMS_PER_BLOB);
1097 let mut journal = Journal::init(context.clone(), cfg.clone())
1098 .await
1099 .expect("failed to initialize journal");
1100
1101 let item_count = ITEMS_PER_BLOB.get() + 3;
1103 for i in 0u64..item_count {
1104 journal
1105 .append(test_digest(i))
1106 .await
1107 .expect("failed to append data");
1108 }
1109 assert_eq!(journal.size(), item_count);
1110 journal.close().await.expect("Failed to close journal");
1111
1112 let (blob, size) = context
1115 .open(&cfg.partition, &1u64.to_be_bytes())
1116 .await
1117 .expect("Failed to open blob");
1118 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1119
1120 let checksum_offset = Digest::SIZE + u32::SIZE + Digest::SIZE;
1123
1124 let bad_checksum = 123456789u32;
1125 blob.write_at(bad_checksum.to_be_bytes().to_vec(), checksum_offset as u64)
1126 .await
1127 .expect("Failed to write incorrect checksum");
1128 blob.sync().await.expect("Failed to sync blob");
1129
1130 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1131 .await
1132 .unwrap();
1133
1134 assert_eq!(journal.size(), item_count - 2);
1136
1137 let (blob, size) = context
1139 .open(&cfg.partition, &1u64.to_be_bytes())
1140 .await
1141 .expect("Failed to open blob");
1142 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1143 blob.sync().await.expect("Failed to sync blob");
1144
1145 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1146 .await
1147 .unwrap();
1148
1149 assert_eq!(journal.size(), item_count - 3);
1151
1152 journal.destroy().await.expect("Failed to destroy journal");
1154 });
1155 }
1156
1157 #[test_traced]
1158 fn test_fixed_journal_partial_replay() {
1159 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1160 const START_POS: u64 = 53;
1163
1164 let executor = deterministic::Runner::default();
1166 executor.start(|context| async move {
1168 let cfg = test_cfg(ITEMS_PER_BLOB);
1170 let mut journal = Journal::init(context.clone(), cfg.clone())
1171 .await
1172 .expect("failed to initialize journal");
1173
1174 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1176 let pos = journal
1177 .append(test_digest(i))
1178 .await
1179 .expect("failed to append data");
1180 assert_eq!(pos, i);
1181 }
1182
1183 let buffer = context.encode();
1184 assert!(buffer.contains("tracked 101"));
1185
1186 {
1188 let stream = journal
1189 .replay(NZUsize!(1024), START_POS)
1190 .await
1191 .expect("failed to replay journal");
1192 let mut items = Vec::new();
1193 pin_mut!(stream);
1194 while let Some(result) = stream.next().await {
1195 match result {
1196 Ok((pos, item)) => {
1197 assert!(pos >= START_POS, "pos={pos}");
1198 assert_eq!(
1199 test_digest(pos),
1200 item,
1201 "Item at position {pos} did not match expected digest"
1202 );
1203 items.push(pos);
1204 }
1205 Err(err) => panic!("Failed to read item: {err}"),
1206 }
1207 }
1208
1209 assert_eq!(
1211 items.len(),
1212 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1213 - START_POS as usize
1214 );
1215 items.sort();
1216 for (i, pos) in items.iter().enumerate() {
1217 assert_eq!(i as u64, *pos - START_POS);
1218 }
1219 }
1220
1221 journal.destroy().await.unwrap();
1222 });
1223 }
1224
1225 #[test_traced]
1226 fn test_fixed_journal_recover_from_partial_write() {
1227 let executor = deterministic::Runner::default();
1229
1230 executor.start(|context| async move {
1232 let cfg = test_cfg(NZU64!(3));
1234 let mut journal = Journal::init(context.clone(), cfg.clone())
1235 .await
1236 .expect("failed to initialize journal");
1237 for i in 0..5 {
1238 journal
1239 .append(test_digest(i))
1240 .await
1241 .expect("failed to append data");
1242 }
1243 assert_eq!(journal.size(), 5);
1244 let buffer = context.encode();
1245 assert!(buffer.contains("tracked 2"));
1246 journal.close().await.expect("Failed to close journal");
1247
1248 let (blob, size) = context
1250 .open(&cfg.partition, &1u64.to_be_bytes())
1251 .await
1252 .expect("Failed to open blob");
1253 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1255 blob.sync().await.expect("Failed to sync blob");
1256
1257 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1259 .await
1260 .expect("Failed to re-initialize journal");
1261 assert_eq!(journal.size(), 4);
1263 let buffer = context.encode();
1264 assert!(buffer.contains("tracked 2"));
1265 journal.close().await.expect("Failed to close journal");
1266
1267 context
1270 .remove(&cfg.partition, Some(&1u64.to_be_bytes()))
1271 .await
1272 .expect("Failed to remove blob");
1273 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1274 .await
1275 .expect("Failed to re-initialize journal");
1276 assert_eq!(journal.size(), 3);
1277 let buffer = context.encode();
1278 assert!(buffer.contains("tracked 2"));
1282 assert_eq!(journal.size(), 3);
1283
1284 journal.destroy().await.unwrap();
1285 });
1286 }
1287
1288 #[test_traced]
1289 fn test_fixed_journal_recover_to_empty_from_partial_write() {
1290 let executor = deterministic::Runner::default();
1291 executor.start(|context| async move {
1292 let cfg = test_cfg(NZU64!(10));
1294 let mut journal = Journal::init(context.clone(), cfg.clone())
1295 .await
1296 .expect("failed to initialize journal");
1297 journal
1299 .append(test_digest(0))
1300 .await
1301 .expect("failed to append data");
1302 assert_eq!(journal.size(), 1);
1303 journal.close().await.expect("Failed to close journal");
1304
1305 let (blob, size) = context
1307 .open(&cfg.partition, &0u64.to_be_bytes())
1308 .await
1309 .expect("Failed to open blob");
1310 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1312 blob.sync().await.expect("Failed to sync blob");
1313
1314 let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1316 .await
1317 .expect("Failed to re-initialize journal");
1318
1319 assert_eq!(journal.size(), 0);
1322 assert_eq!(journal.oldest_retained_pos(), None);
1323 journal
1325 .append(test_digest(0))
1326 .await
1327 .expect("failed to append data");
1328 assert_eq!(journal.size(), 1);
1329
1330 journal.destroy().await.unwrap();
1331 });
1332 }
1333
1334 #[test_traced]
1335 fn test_fixed_journal_recover_from_unwritten_data() {
1336 let executor = deterministic::Runner::default();
1337 executor.start(|context| async move {
1338 let cfg = test_cfg(NZU64!(10));
1340 let mut journal = Journal::init(context.clone(), cfg.clone())
1341 .await
1342 .expect("failed to initialize journal");
1343
1344 journal
1346 .append(test_digest(0))
1347 .await
1348 .expect("failed to append data");
1349 assert_eq!(journal.size(), 1);
1350 journal.close().await.expect("Failed to close journal");
1351
1352 let (blob, size) = context
1356 .open(&cfg.partition, &0u64.to_be_bytes())
1357 .await
1358 .expect("Failed to open blob");
1359 blob.write_at(vec![0u8; Digest::SIZE * 3 - 1], size)
1360 .await
1361 .expect("Failed to extend blob");
1362 blob.sync().await.expect("Failed to sync blob");
1363
1364 let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1366 .await
1367 .expect("Failed to re-initialize journal");
1368
1369 assert_eq!(journal.size(), 1);
1371 assert_eq!(journal.oldest_retained_pos(), Some(0));
1372
1373 journal
1375 .append(test_digest(1))
1376 .await
1377 .expect("failed to append data");
1378 assert_eq!(journal.size(), 2);
1379
1380 let item = journal.read(0).await.unwrap();
1382 assert_eq!(item, test_digest(0));
1383
1384 let item = journal.read(1).await.unwrap();
1386 assert_eq!(item, test_digest(1));
1387
1388 journal.destroy().await.unwrap();
1389 });
1390 }
1391
1392 #[test_traced]
1393 fn test_fixed_journal_rewinding() {
1394 let executor = deterministic::Runner::default();
1395 executor.start(|context| async move {
1396 let cfg = test_cfg(NZU64!(2));
1398 let mut journal = Journal::init(context.clone(), cfg.clone())
1399 .await
1400 .expect("failed to initialize journal");
1401 assert!(matches!(journal.rewind(0).await, Ok(())));
1402 assert!(matches!(
1403 journal.rewind(1).await,
1404 Err(Error::InvalidRewind(1))
1405 ));
1406
1407 journal
1409 .append(test_digest(0))
1410 .await
1411 .expect("failed to append data 0");
1412 assert_eq!(journal.size(), 1);
1413 assert!(matches!(journal.rewind(1).await, Ok(()))); assert!(matches!(journal.rewind(0).await, Ok(())));
1415 assert_eq!(journal.size(), 0);
1416
1417 for i in 0..7 {
1419 let pos = journal
1420 .append(test_digest(i))
1421 .await
1422 .expect("failed to append data");
1423 assert_eq!(pos, i);
1424 }
1425 let buffer = context.encode();
1426 assert!(buffer.contains("tracked 4"));
1427 assert_eq!(journal.size(), 7);
1428
1429 assert!(matches!(journal.rewind(4).await, Ok(())));
1431 assert_eq!(journal.size(), 4);
1432 let buffer = context.encode();
1433 assert!(buffer.contains("tracked 3"));
1434
1435 assert!(matches!(journal.rewind(0).await, Ok(())));
1437 let buffer = context.encode();
1438 assert!(buffer.contains("tracked 1"));
1439 assert_eq!(journal.size(), 0);
1440
1441 for _ in 0..10 {
1443 for i in 0..100 {
1444 journal
1445 .append(test_digest(i))
1446 .await
1447 .expect("failed to append data");
1448 }
1449 journal.rewind(journal.size() - 49).await.unwrap();
1450 }
1451 const ITEMS_REMAINING: u64 = 10 * (100 - 49);
1452 assert_eq!(journal.size(), ITEMS_REMAINING);
1453
1454 journal.close().await.expect("Failed to close journal");
1455
1456 let mut cfg = test_cfg(NZU64!(3));
1458 cfg.partition = "test_partition_2".into();
1459 let mut journal = Journal::init(context.clone(), cfg.clone())
1460 .await
1461 .expect("failed to initialize journal");
1462 for _ in 0..10 {
1463 for i in 0..100 {
1464 journal
1465 .append(test_digest(i))
1466 .await
1467 .expect("failed to append data");
1468 }
1469 journal.rewind(journal.size() - 49).await.unwrap();
1470 }
1471 assert_eq!(journal.size(), ITEMS_REMAINING);
1472
1473 journal.close().await.expect("Failed to close journal");
1474
1475 let mut journal: Journal<_, Digest> = Journal::init(context.clone(), cfg.clone())
1477 .await
1478 .expect("failed to re-initialize journal");
1479 assert_eq!(journal.size(), 10 * (100 - 49));
1480
1481 journal.prune(300).await.expect("pruning failed");
1483 assert_eq!(journal.size(), ITEMS_REMAINING);
1484 assert!(matches!(
1486 journal.rewind(299).await,
1487 Err(Error::InvalidRewind(299))
1488 ));
1489 assert!(matches!(journal.rewind(300).await, Ok(())));
1492 assert_eq!(journal.size(), 300);
1493 assert_eq!(journal.oldest_retained_pos(), None);
1494
1495 journal.destroy().await.unwrap();
1496 });
1497 }
1498
1499 #[test_traced]
1501 fn test_journal_conformance() {
1502 let executor = deterministic::Runner::default();
1504
1505 executor.start(|context| async move {
1507 let cfg = test_cfg(NZU64!(60));
1509
1510 let mut journal = Journal::init(context.clone(), cfg.clone())
1512 .await
1513 .expect("failed to initialize journal");
1514
1515 for i in 0..100 {
1517 journal
1518 .append(test_digest(i))
1519 .await
1520 .expect("Failed to append data");
1521 }
1522
1523 journal.close().await.expect("Failed to close journal");
1525
1526 let (blob, size) = context
1528 .open(&cfg.partition, &0u64.to_be_bytes())
1529 .await
1530 .expect("Failed to open blob");
1531 assert!(size > 0);
1532 let buf = blob
1533 .read_at(vec![0u8; size as usize], 0)
1534 .await
1535 .expect("Failed to read blob");
1536 let digest = Sha256::hash(buf.as_ref());
1537 assert_eq!(
1538 hex(&digest),
1539 "ed2ea67208cde2ee8c16cca5aa4f369f55b1402258c6b7760e5baf134e38944a",
1540 );
1541 blob.sync().await.expect("Failed to sync blob");
1542 let (blob, size) = context
1543 .open(&cfg.partition, &1u64.to_be_bytes())
1544 .await
1545 .expect("Failed to open blob");
1546 assert!(size > 0);
1547 let buf = blob
1548 .read_at(vec![0u8; size as usize], 0)
1549 .await
1550 .expect("Failed to read blob");
1551 let digest = Sha256::hash(buf.as_ref());
1552 assert_eq!(
1553 hex(&digest),
1554 "cc7efd4fc999aff36b9fd4213ba8da5810dc1849f92ae2ddf7c6dc40545f9aff",
1555 );
1556 blob.sync().await.expect("Failed to sync blob");
1557
1558 let journal = Journal::<Context, Digest>::init(context.clone(), cfg.clone())
1559 .await
1560 .expect("failed to initialize journal");
1561 journal.destroy().await.unwrap();
1562 });
1563 }
1564}