1use super::Error;
59use bytes::BufMut;
60use commonware_codec::{CodecFixed, DecodeExt, FixedSize};
61use commonware_runtime::{
62 buffer::{Append, PoolRef, Read},
63 Blob, Error as RError, Metrics, Storage,
64};
65use commonware_utils::hex;
66use futures::{
67 future::try_join_all,
68 stream::{self, Stream},
69 StreamExt,
70};
71use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
72use std::{
73 collections::BTreeMap,
74 marker::PhantomData,
75 num::{NonZeroU64, NonZeroUsize},
76};
77use tracing::{debug, trace, warn};
78
79#[derive(Clone)]
81pub struct Config {
82 pub partition: String,
84
85 pub items_per_blob: NonZeroU64,
90
91 pub buffer_pool: PoolRef,
93
94 pub write_buffer: NonZeroUsize,
96}
97
98pub struct Journal<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> {
100 pub(crate) context: E,
101 pub(crate) cfg: Config,
102
103 pub(crate) blobs: BTreeMap<u64, Append<E::Blob>>,
111
112 pub(crate) tail: Append<E::Blob>,
118
119 pub(crate) tail_index: u64,
121
122 pub(crate) tracked: Gauge,
123 pub(crate) synced: Counter,
124 pub(crate) pruned: Counter,
125
126 pub(crate) _array: PhantomData<A>,
127}
128
129impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> Journal<E, A> {
130 pub(crate) const CHUNK_SIZE: usize = u32::SIZE + A::SIZE;
131 pub(crate) const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
132
133 pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
145 let mut blobs = BTreeMap::new();
147 let stored_blobs = match context.scan(&cfg.partition).await {
148 Ok(blobs) => blobs,
149 Err(RError::PartitionMissing(_)) => Vec::new(),
150 Err(err) => return Err(Error::Runtime(err)),
151 };
152 for name in stored_blobs {
153 let (blob, size) = context
154 .open(&cfg.partition, &name)
155 .await
156 .map_err(Error::Runtime)?;
157 let index = match name.try_into() {
158 Ok(index) => u64::from_be_bytes(index),
159 Err(nm) => return Err(Error::InvalidBlobName(hex(&nm))),
160 };
161 debug!(blob = index, size, "loaded blob");
162 blobs.insert(index, (blob, size));
163 }
164
165 let full_size = cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64;
167 if !blobs.is_empty() {
168 let mut it = blobs.keys().rev();
169 let mut prev_index = *it.next().unwrap();
170 for index in it {
171 let (_, size) = blobs.get(index).unwrap();
172 if *index != prev_index - 1 {
173 return Err(Error::MissingBlob(prev_index - 1));
174 }
175 prev_index = *index;
176 if *size != full_size {
177 return Err(Error::InvalidBlobSize(*index, *size));
179 }
180 }
181 } else {
182 debug!("no blobs found");
183 let (blob, size) = context.open(&cfg.partition, &0u64.to_be_bytes()).await?;
184 assert_eq!(size, 0);
185 blobs.insert(0, (blob, size));
186 }
187
188 let tracked = Gauge::default();
190 let synced = Counter::default();
191 let pruned = Counter::default();
192 context.register("tracked", "Number of blobs", tracked.clone());
193 context.register("synced", "Number of syncs", synced.clone());
194 context.register("pruned", "Number of blobs pruned", pruned.clone());
195 tracked.set(blobs.len() as i64);
196
197 let (mut tail_index, (mut tail, mut tail_size)) = blobs.pop_last().unwrap();
199
200 tail_size = Self::trim_tail(&tail, tail_size, tail_index).await?;
202 if tail_size > full_size {
203 return Err(Error::InvalidBlobSize(tail_index, tail_size));
204 }
205
206 if tail_size == full_size {
209 warn!(
210 blob = tail_index,
211 "tail blob is full, creating a new empty one"
212 );
213 blobs.insert(tail_index, (tail, tail_size));
214 tail_index += 1;
215 (tail, tail_size) = context
216 .open(&cfg.partition, &tail_index.to_be_bytes())
217 .await?;
218 assert_eq!(tail_size, 0);
219 tracked.inc();
220 }
221
222 let blobs = try_join_all(blobs.into_iter().map(|(index, (blob, size))| {
226 let pool = cfg.buffer_pool.clone();
227 async move {
228 let blob = Append::new(blob, size, cfg.write_buffer, pool).await?;
229 Ok::<_, Error>((index, (blob, size)))
230 }
231 }))
232 .await?;
233 let tail = Append::new(tail, tail_size, cfg.write_buffer, cfg.buffer_pool.clone()).await?;
234
235 Ok(Self {
236 context,
237 cfg,
238 blobs: blobs
239 .into_iter()
240 .map(|(index, (blob, _))| (index, blob))
241 .collect(),
242 tail,
243 tail_index,
244 tracked,
245 synced,
246 pruned,
247 _array: PhantomData,
248 })
249 }
250
251 async fn trim_tail(
255 tail: &<E as Storage>::Blob,
256 mut tail_size: u64,
257 tail_index: u64,
258 ) -> Result<u64, Error> {
259 let mut truncated = false;
260 if !tail_size.is_multiple_of(Self::CHUNK_SIZE_U64) {
261 warn!(
262 blob = tail_index,
263 invalid_size = tail_size,
264 "last blob size is not a multiple of item size, truncating"
265 );
266 tail_size -= tail_size % Self::CHUNK_SIZE_U64;
267 tail.resize(tail_size).await?;
268 truncated = true;
269 }
270
271 while tail_size > 0 {
274 let offset = tail_size - Self::CHUNK_SIZE_U64;
275 let read = tail.read_at(vec![0u8; Self::CHUNK_SIZE], offset).await?;
276 match Self::verify_integrity(read.as_ref()) {
277 Ok(_) => break, Err(Error::ChecksumMismatch(_, _)) => {
279 warn!(blob = tail_index, offset, "checksum mismatch: truncating",);
280 tail_size -= Self::CHUNK_SIZE_U64;
281 tail.resize(tail_size).await?;
282 truncated = true;
283 }
284 Err(err) => return Err(err),
285 }
286 }
287
288 if truncated {
290 tail.sync().await?;
291 }
292
293 Ok(tail_size)
294 }
295
296 pub async fn sync(&mut self) -> Result<(), Error> {
298 self.synced.inc();
299 debug!(blob = self.tail_index, "syncing blob");
300 self.tail.sync().await.map_err(Error::Runtime)
301 }
302
303 pub async fn size(&self) -> Result<u64, Error> {
306 let size = self.tail.size().await;
307 assert_eq!(size % Self::CHUNK_SIZE_U64, 0);
308 let items_in_blob = size / Self::CHUNK_SIZE_U64;
309 Ok(items_in_blob + self.cfg.items_per_blob.get() * self.tail_index)
310 }
311
312 pub async fn append(&mut self, item: A) -> Result<u64, Error> {
315 let mut size = self.tail.size().await;
317 assert!(size < self.cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64);
318 assert_eq!(size % Self::CHUNK_SIZE_U64, 0);
319 let mut buf: Vec<u8> = Vec::with_capacity(Self::CHUNK_SIZE);
320 let item = item.encode();
321 let checksum = crc32fast::hash(&item);
322 buf.extend_from_slice(&item);
323 buf.put_u32(checksum);
324
325 let item_pos =
327 (size / Self::CHUNK_SIZE_U64) + self.cfg.items_per_blob.get() * self.tail_index;
328 self.tail.append(buf).await?;
329 trace!(blob = self.tail_index, pos = item_pos, "appended item");
330 size += Self::CHUNK_SIZE_U64;
331
332 if size == self.cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64 {
335 self.tail.sync().await?;
338
339 let next_blob_index = self.tail_index + 1;
341 debug!(blob = next_blob_index, "creating next blob");
342 let (next_blob, size) = self
343 .context
344 .open(&self.cfg.partition, &next_blob_index.to_be_bytes())
345 .await?;
346 assert_eq!(size, 0);
347 let next_blob = Append::new(
348 next_blob,
349 size,
350 self.cfg.write_buffer,
351 self.cfg.buffer_pool.clone(),
352 )
353 .await?;
354 self.tracked.inc();
355
356 let old_tail = std::mem::replace(&mut self.tail, next_blob);
358 assert!(self.blobs.insert(self.tail_index, old_tail).is_none());
359 self.tail_index = next_blob_index;
360 }
361
362 Ok(item_pos)
363 }
364
365 pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
374 match size.cmp(&self.size().await?) {
375 std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
376 std::cmp::Ordering::Equal => return Ok(()),
377 std::cmp::Ordering::Less => {}
378 }
379 let rewind_to_blob_index = size / self.cfg.items_per_blob;
380 if rewind_to_blob_index < self.oldest_blob_index() {
381 return Err(Error::InvalidRewind(size));
382 }
383 let rewind_to_offset = (size % self.cfg.items_per_blob) * Self::CHUNK_SIZE_U64;
384
385 while rewind_to_blob_index < self.tail_index {
388 let (blob_index, mut new_tail) = self.blobs.pop_last().unwrap();
389 assert_eq!(blob_index, self.tail_index - 1);
390 std::mem::swap(&mut self.tail, &mut new_tail);
391 self.remove_blob(self.tail_index, new_tail).await?;
392 self.tail_index -= 1;
393 }
394
395 self.tail.resize(rewind_to_offset).await?;
397
398 Ok(())
399 }
400
401 pub async fn oldest_retained_pos(&self) -> Result<Option<u64>, Error> {
405 let oldest_blob_index = self.oldest_blob_index();
406 if oldest_blob_index == self.tail_index && self.tail.size().await == 0 {
407 return Ok(None);
408 }
409
410 Ok(Some(oldest_blob_index * self.cfg.items_per_blob.get()))
412 }
413
414 pub async fn read(&self, pos: u64) -> Result<A, Error> {
421 let blob_index = pos / self.cfg.items_per_blob.get();
422 if blob_index > self.tail_index {
423 return Err(Error::ItemOutOfRange(pos));
424 }
425
426 let offset = (pos % self.cfg.items_per_blob.get()) * Self::CHUNK_SIZE_U64;
427
428 let blob = if blob_index == self.tail_index {
429 if offset >= self.tail.size().await {
430 return Err(Error::ItemOutOfRange(pos));
431 }
432 &self.tail
433 } else {
434 self.blobs.get(&blob_index).ok_or(Error::ItemPruned(pos))?
435 };
436
437 let read = blob.read_at(vec![0u8; Self::CHUNK_SIZE], offset).await?;
438 Self::verify_integrity(read.as_ref())
439 }
440
441 fn verify_integrity(buf: &[u8]) -> Result<A, Error> {
448 let stored_checksum = u32::from_be_bytes(buf[A::SIZE..].try_into().unwrap());
449 let checksum = crc32fast::hash(&buf[..A::SIZE]);
450 if checksum != stored_checksum {
451 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
452 }
453 A::decode(&buf[..A::SIZE]).map_err(Error::Codec)
454 }
455
456 pub async fn replay(
466 &self,
467 buffer: NonZeroUsize,
468 start_pos: u64,
469 ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + '_, Error> {
470 assert!(start_pos <= self.size().await?);
471
472 let items_per_blob = self.cfg.items_per_blob.get();
474 let start_blob = start_pos / items_per_blob;
475 assert!(start_blob <= self.tail_index);
476 let blobs = self.blobs.range(start_blob..).collect::<Vec<_>>();
477 let full_size = items_per_blob * Self::CHUNK_SIZE_U64;
478 let mut blob_plus = blobs
479 .into_iter()
480 .map(|(blob_index, blob)| (*blob_index, blob.clone_blob(), full_size))
481 .collect::<Vec<_>>();
482
483 self.tail.sync().await?; let tail_size = self.tail.size().await;
486 blob_plus.push((self.tail_index, self.tail.clone_blob(), tail_size));
487 let start_offset = (start_pos % items_per_blob) * Self::CHUNK_SIZE_U64;
488
489 let stream = stream::iter(blob_plus).flat_map(move |(blob_index, blob, size)| {
492 let mut reader = Read::new(blob, size, buffer);
495 let buf = vec![0u8; Self::CHUNK_SIZE];
496 let initial_offset = if blob_index == start_blob {
497 reader.seek_to(start_offset).expect("invalid start_pos");
499 start_offset
500 } else {
501 0
502 };
503
504 stream::unfold(
505 (buf, reader, initial_offset),
506 move |(mut buf, mut reader, offset)| async move {
507 if offset >= reader.blob_size() {
508 return None;
509 }
510
511 let item_pos = items_per_blob * blob_index + offset / Self::CHUNK_SIZE_U64;
514 match reader.read_exact(&mut buf, Self::CHUNK_SIZE).await {
515 Ok(()) => {
516 let next_offset = offset + Self::CHUNK_SIZE_U64;
517 let result = Self::verify_integrity(&buf).map(|item| (item_pos, item));
518 if result.is_err() {
519 warn!("corrupted item at {item_pos}");
520 }
521 Some((result, (buf, reader, next_offset)))
522 }
523 Err(err) => {
524 warn!(
525 item_pos,
526 err = err.to_string(),
527 "error reading item during replay"
528 );
529 Some((Err(Error::Runtime(err)), (buf, reader, size)))
530 }
531 }
532 },
533 )
534 });
535
536 Ok(stream)
537 }
538
539 fn oldest_blob_index(&self) -> u64 {
541 if self.blobs.is_empty() {
542 self.tail_index
543 } else {
544 *self.blobs.first_key_value().unwrap().0
545 }
546 }
547
548 pub async fn prune(&mut self, min_item_pos: u64) -> Result<bool, Error> {
555 let oldest_blob_index = self.oldest_blob_index();
556 let new_oldest_blob =
557 std::cmp::min(min_item_pos / self.cfg.items_per_blob, self.tail_index);
558
559 let mut pruned = false;
560 for index in oldest_blob_index..new_oldest_blob {
561 pruned = true;
562 let blob = self.blobs.remove(&index).unwrap();
563 self.remove_blob(index, blob).await?;
564 self.pruned.inc();
565 }
566
567 Ok(pruned)
568 }
569
570 async fn remove_blob(&mut self, index: u64, blob: Append<E::Blob>) -> Result<(), Error> {
572 drop(blob);
573 self.context
574 .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
575 .await?;
576 debug!(blob = index, "removed blob");
577 self.tracked.dec();
578
579 Ok(())
580 }
581
582 pub async fn close(self) -> Result<(), Error> {
584 for (i, blob) in self.blobs.into_iter() {
585 blob.sync().await?;
586 debug!(blob = i, "synced blob");
587 }
588 self.tail.sync().await?;
589 debug!(blob = self.tail_index, "synced tail");
590
591 Ok(())
592 }
593
594 pub async fn destroy(self) -> Result<(), Error> {
596 for (i, blob) in self.blobs.into_iter() {
597 drop(blob);
598 debug!(blob = i, "destroyed blob");
599 self.context
600 .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
601 .await?;
602 }
603
604 drop(self.tail);
605 debug!(blob = self.tail_index, "destroyed blob");
606 self.context
607 .remove(&self.cfg.partition, Some(&self.tail_index.to_be_bytes()))
608 .await?;
609
610 match self.context.remove(&self.cfg.partition, None).await {
611 Ok(()) => {}
612 Err(RError::PartitionMissing(_)) => {
613 }
615 Err(err) => return Err(Error::Runtime(err)),
616 }
617
618 Ok(())
619 }
620}
621
622#[cfg(test)]
623mod tests {
624 use super::*;
625 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
626 use commonware_macros::test_traced;
627 use commonware_runtime::{
628 deterministic::{self, Context},
629 Blob, Runner, Storage,
630 };
631 use commonware_utils::{NZUsize, NZU64};
632 use futures::{pin_mut, StreamExt};
633
634 const PAGE_SIZE: NonZeroUsize = NZUsize!(44);
635 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
636
637 fn test_digest(value: u64) -> Digest {
639 Sha256::hash(&value.to_be_bytes())
640 }
641
642 fn test_cfg(items_per_blob: NonZeroU64) -> Config {
643 Config {
644 partition: "test_partition".into(),
645 items_per_blob,
646 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
647 write_buffer: NZUsize!(2048),
648 }
649 }
650
651 #[test_traced]
652 fn test_fixed_journal_append_and_prune() {
653 let executor = deterministic::Runner::default();
655
656 executor.start(|context| async move {
658 let cfg = test_cfg(NZU64!(2));
660 let mut journal = Journal::init(context.clone(), cfg.clone())
661 .await
662 .expect("failed to initialize journal");
663
664 let buffer = context.encode();
665 assert!(buffer.contains("tracked 1"));
666
667 let mut pos = journal
669 .append(test_digest(0))
670 .await
671 .expect("failed to append data 0");
672 assert_eq!(pos, 0);
673
674 journal.close().await.expect("Failed to close journal");
676
677 let cfg = test_cfg(NZU64!(2));
679 let mut journal = Journal::init(context.clone(), cfg.clone())
680 .await
681 .expect("failed to re-initialize journal");
682
683 pos = journal
685 .append(test_digest(1))
686 .await
687 .expect("failed to append data 1");
688 assert_eq!(pos, 1);
689 pos = journal
690 .append(test_digest(2))
691 .await
692 .expect("failed to append data 2");
693 assert_eq!(pos, 2);
694 let buffer = context.encode();
695 assert!(buffer.contains("tracked 2"));
696
697 let item0 = journal.read(0).await.expect("failed to read data 0");
699 assert_eq!(item0, test_digest(0));
700 let item1 = journal.read(1).await.expect("failed to read data 1");
701 assert_eq!(item1, test_digest(1));
702 let item2 = journal.read(2).await.expect("failed to read data 2");
703 assert_eq!(item2, test_digest(2));
704 let err = journal.read(3).await.expect_err("expected read to fail");
705 assert!(matches!(err, Error::ItemOutOfRange(3)));
706
707 journal.sync().await.expect("failed to sync journal");
709 let buffer = context.encode();
710 assert!(buffer.contains("synced_total 1"));
711
712 journal.prune(1).await.expect("failed to prune journal 1");
714 let buffer = context.encode();
715 assert!(buffer.contains("tracked 2"));
716
717 journal.prune(2).await.expect("failed to prune journal 2");
719 assert_eq!(journal.oldest_retained_pos().await.unwrap(), Some(2));
720 let buffer = context.encode();
721 assert!(buffer.contains("tracked 1"));
722 assert!(buffer.contains("pruned_total 1"));
723
724 let result0 = journal.read(0).await;
726 assert!(matches!(result0, Err(Error::ItemPruned(0))));
727 let result1 = journal.read(1).await;
728 assert!(matches!(result1, Err(Error::ItemPruned(1))));
729
730 let result2 = journal.read(2).await.unwrap();
732 assert_eq!(result2, test_digest(2));
733
734 for i in 3..10 {
736 let pos = journal
737 .append(test_digest(i))
738 .await
739 .expect("failed to append data");
740 assert_eq!(pos, i);
741 }
742
743 journal.prune(0).await.expect("no-op pruning failed");
745 assert_eq!(journal.oldest_blob_index(), 1);
746 assert_eq!(journal.tail_index, 5);
747 assert_eq!(journal.oldest_retained_pos().await.unwrap(), Some(2));
748
749 journal
751 .prune(3 * cfg.items_per_blob.get())
752 .await
753 .expect("failed to prune journal 2");
754 assert_eq!(journal.oldest_retained_pos().await.unwrap(), Some(6));
755 let buffer = context.encode();
756 assert_eq!(journal.oldest_blob_index(), 3);
757 assert_eq!(journal.tail_index, 5);
758 assert!(buffer.contains("tracked 3"));
759 assert!(buffer.contains("pruned_total 3"));
760
761 journal
763 .prune(10000)
764 .await
765 .expect("failed to max-prune journal");
766 let buffer = context.encode();
767 let size = journal.size().await.unwrap();
768 assert_eq!(size, 10);
769 assert_eq!(journal.oldest_blob_index(), 5);
770 assert_eq!(journal.tail_index, 5);
771 assert!(buffer.contains("tracked 1"));
772 assert!(buffer.contains("pruned_total 5"));
773 assert_eq!(journal.oldest_retained_pos().await.unwrap(), None);
776
777 {
778 let stream = journal
779 .replay(NZUsize!(1024), 0)
780 .await
781 .expect("failed to replay journal");
782 pin_mut!(stream);
783 let mut items = Vec::new();
784 while let Some(result) = stream.next().await {
785 match result {
786 Ok((pos, item)) => {
787 assert_eq!(test_digest(pos), item);
788 items.push(pos);
789 }
790 Err(err) => panic!("Failed to read item: {err}"),
791 }
792 }
793 assert_eq!(items, Vec::<u64>::new());
794 }
795
796 journal.destroy().await.unwrap();
797 });
798 }
799
800 #[test_traced]
802 fn test_fixed_journal_append_a_lot_of_data() {
803 let executor = deterministic::Runner::default();
805 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10000);
806 executor.start(|context| async move {
807 let cfg = test_cfg(ITEMS_PER_BLOB);
808 let mut journal = Journal::init(context.clone(), cfg.clone())
809 .await
810 .expect("failed to initialize journal");
811 for i in 0u64..ITEMS_PER_BLOB.get() * 2 - 1 {
813 journal
814 .append(test_digest(i))
815 .await
816 .expect("failed to append data");
817 }
818 journal.close().await.expect("failed to close journal");
820 let journal = Journal::init(context.clone(), cfg.clone())
821 .await
822 .expect("failed to re-initialize journal");
823 for i in 0u64..10000 {
824 let item: Digest = journal.read(i).await.expect("failed to read data");
825 assert_eq!(item, test_digest(i));
826 }
827 journal.destroy().await.expect("failed to destroy journal");
828 });
829 }
830
831 #[test_traced]
832 fn test_fixed_journal_replay() {
833 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
834 let executor = deterministic::Runner::default();
836
837 executor.start(|context| async move {
839 let cfg = test_cfg(ITEMS_PER_BLOB);
841 let mut journal = Journal::init(context.clone(), cfg.clone())
842 .await
843 .expect("failed to initialize journal");
844
845 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
847 let pos = journal
848 .append(test_digest(i))
849 .await
850 .expect("failed to append data");
851 assert_eq!(pos, i);
852 }
853
854 let buffer = context.encode();
855 assert!(buffer.contains("tracked 101"));
856
857 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
859 let item: Digest = journal.read(i).await.expect("failed to read data");
860 assert_eq!(item, test_digest(i), "i={i}");
861 }
862
863 {
865 let stream = journal
866 .replay(NZUsize!(1024), 0)
867 .await
868 .expect("failed to replay journal");
869 let mut items = Vec::new();
870 pin_mut!(stream);
871 while let Some(result) = stream.next().await {
872 match result {
873 Ok((pos, item)) => {
874 assert_eq!(test_digest(pos), item, "pos={pos}, item={item:?}");
875 items.push(pos);
876 }
877 Err(err) => panic!("Failed to read item: {err}"),
878 }
879 }
880
881 assert_eq!(
883 items.len(),
884 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
885 );
886 items.sort();
887 for (i, pos) in items.iter().enumerate() {
888 assert_eq!(i as u64, *pos);
889 }
890 }
891 journal.close().await.expect("Failed to close journal");
892
893 let checksum_offset = Digest::SIZE as u64
895 + (ITEMS_PER_BLOB.get() / 2) * (Digest::SIZE + u32::SIZE) as u64;
896 let (blob, _) = context
897 .open(&cfg.partition, &40u64.to_be_bytes())
898 .await
899 .expect("Failed to open blob");
900 let bad_checksum = 123456789u32;
902 blob.write_at(bad_checksum.to_be_bytes().to_vec(), checksum_offset)
903 .await
904 .expect("Failed to write incorrect checksum");
905 let corrupted_item_pos = 40 * ITEMS_PER_BLOB.get() + ITEMS_PER_BLOB.get() / 2;
906 blob.sync().await.expect("Failed to sync blob");
907
908 let journal = Journal::init(context.clone(), cfg.clone())
910 .await
911 .expect("Failed to re-initialize journal");
912
913 let err = journal.read(corrupted_item_pos).await.unwrap_err();
915 assert!(matches!(err, Error::ChecksumMismatch(x, _) if x == bad_checksum));
916
917 {
919 let stream = journal
920 .replay(NZUsize!(1024), 0)
921 .await
922 .expect("failed to replay journal");
923 let mut items = Vec::new();
924 pin_mut!(stream);
925 let mut error_count = 0;
926 while let Some(result) = stream.next().await {
927 match result {
928 Ok((pos, item)) => {
929 assert_eq!(test_digest(pos), item);
930 items.push(pos);
931 }
932 Err(err) => {
933 error_count += 1;
934 assert!(matches!(err, Error::ChecksumMismatch(_, _)));
935 }
936 }
937 }
938 assert_eq!(error_count, 1);
939 assert_eq!(
941 items.len(),
942 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2 - 1
943 );
944 }
945 journal.close().await.expect("Failed to close journal");
946 });
947 }
948
949 #[test_traced]
950 fn test_fixed_journal_init_with_corrupted_historical_blobs() {
951 let executor = deterministic::Runner::default();
953 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
955 executor.start(|context| async move {
956 let cfg = test_cfg(ITEMS_PER_BLOB);
958 let mut journal = Journal::init(context.clone(), cfg.clone())
959 .await
960 .expect("failed to initialize journal");
961
962 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
964 let pos = journal
965 .append(test_digest(i))
966 .await
967 .expect("failed to append data");
968 assert_eq!(pos, i);
969 }
970 journal.close().await.expect("Failed to close journal");
971
972 let buffer = context.encode();
973 assert!(buffer.contains("tracked 101"));
974
975 let (blob, size) = context
977 .open(&cfg.partition, &40u64.to_be_bytes())
978 .await
979 .expect("Failed to open blob");
980 blob.resize(size - 1).await.expect("Failed to corrupt blob");
981 blob.sync().await.expect("Failed to sync blob");
982 let result = Journal::<_, Digest>::init(context.clone(), cfg.clone()).await;
983 assert!(matches!(
984 result.err().unwrap(),
985 Error::InvalidBlobSize(_, _)
986 ));
987
988 context
990 .remove(&cfg.partition, Some(&40u64.to_be_bytes()))
991 .await
992 .expect("Failed to remove blob");
993 let result = Journal::<_, Digest>::init(context.clone(), cfg.clone()).await;
994 assert!(matches!(result.err().unwrap(), Error::MissingBlob(n) if n == 40));
995 });
996 }
997
998 #[test_traced]
999 fn test_fixed_journal_test_trim_blob() {
1000 let executor = deterministic::Runner::default();
1002 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1004 executor.start(|context| async move {
1005 let cfg = test_cfg(ITEMS_PER_BLOB);
1007 let mut journal = Journal::init(context.clone(), cfg.clone())
1008 .await
1009 .expect("failed to initialize journal");
1010
1011 let item_count = ITEMS_PER_BLOB.get() + 3;
1013 for i in 0u64..item_count {
1014 journal
1015 .append(test_digest(i))
1016 .await
1017 .expect("failed to append data");
1018 }
1019 assert_eq!(journal.size().await.unwrap(), item_count);
1020 journal.close().await.expect("Failed to close journal");
1021
1022 let (blob, size) = context
1025 .open(&cfg.partition, &1u64.to_be_bytes())
1026 .await
1027 .expect("Failed to open blob");
1028 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1029
1030 let checksum_offset = Digest::SIZE + u32::SIZE + Digest::SIZE;
1033
1034 let bad_checksum = 123456789u32;
1035 blob.write_at(bad_checksum.to_be_bytes().to_vec(), checksum_offset as u64)
1036 .await
1037 .expect("Failed to write incorrect checksum");
1038 blob.sync().await.expect("Failed to sync blob");
1039
1040 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1041 .await
1042 .unwrap();
1043
1044 assert_eq!(journal.size().await.unwrap(), item_count - 2);
1046
1047 let (blob, size) = context
1049 .open(&cfg.partition, &1u64.to_be_bytes())
1050 .await
1051 .expect("Failed to open blob");
1052 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1053 blob.sync().await.expect("Failed to sync blob");
1054
1055 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1056 .await
1057 .unwrap();
1058
1059 assert_eq!(journal.size().await.unwrap(), item_count - 3);
1061
1062 journal.destroy().await.expect("Failed to destroy journal");
1064 });
1065 }
1066
1067 #[test_traced]
1068 fn test_fixed_journal_partial_replay() {
1069 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1070 const START_POS: u64 = 53;
1073
1074 let executor = deterministic::Runner::default();
1076 executor.start(|context| async move {
1078 let cfg = test_cfg(ITEMS_PER_BLOB);
1080 let mut journal = Journal::init(context.clone(), cfg.clone())
1081 .await
1082 .expect("failed to initialize journal");
1083
1084 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1086 let pos = journal
1087 .append(test_digest(i))
1088 .await
1089 .expect("failed to append data");
1090 assert_eq!(pos, i);
1091 }
1092
1093 let buffer = context.encode();
1094 assert!(buffer.contains("tracked 101"));
1095
1096 {
1098 let stream = journal
1099 .replay(NZUsize!(1024), START_POS)
1100 .await
1101 .expect("failed to replay journal");
1102 let mut items = Vec::new();
1103 pin_mut!(stream);
1104 while let Some(result) = stream.next().await {
1105 match result {
1106 Ok((pos, item)) => {
1107 assert!(pos >= START_POS, "pos={pos}");
1108 assert_eq!(
1109 test_digest(pos),
1110 item,
1111 "Item at position {pos} did not match expected digest"
1112 );
1113 items.push(pos);
1114 }
1115 Err(err) => panic!("Failed to read item: {err}"),
1116 }
1117 }
1118
1119 assert_eq!(
1121 items.len(),
1122 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1123 - START_POS as usize
1124 );
1125 items.sort();
1126 for (i, pos) in items.iter().enumerate() {
1127 assert_eq!(i as u64, *pos - START_POS);
1128 }
1129 }
1130
1131 journal.destroy().await.unwrap();
1132 });
1133 }
1134
1135 #[test_traced]
1136 fn test_fixed_journal_recover_from_partial_write() {
1137 let executor = deterministic::Runner::default();
1139
1140 executor.start(|context| async move {
1142 let cfg = test_cfg(NZU64!(3));
1144 let mut journal = Journal::init(context.clone(), cfg.clone())
1145 .await
1146 .expect("failed to initialize journal");
1147 for i in 0..5 {
1148 journal
1149 .append(test_digest(i))
1150 .await
1151 .expect("failed to append data");
1152 }
1153 assert_eq!(journal.size().await.unwrap(), 5);
1154 let buffer = context.encode();
1155 assert!(buffer.contains("tracked 2"));
1156 journal.close().await.expect("Failed to close journal");
1157
1158 let (blob, size) = context
1160 .open(&cfg.partition, &1u64.to_be_bytes())
1161 .await
1162 .expect("Failed to open blob");
1163 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1165 blob.sync().await.expect("Failed to sync blob");
1166
1167 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1169 .await
1170 .expect("Failed to re-initialize journal");
1171 assert_eq!(journal.size().await.unwrap(), 4);
1173 let buffer = context.encode();
1174 assert!(buffer.contains("tracked 2"));
1175 journal.close().await.expect("Failed to close journal");
1176
1177 context
1180 .remove(&cfg.partition, Some(&1u64.to_be_bytes()))
1181 .await
1182 .expect("Failed to remove blob");
1183 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1184 .await
1185 .expect("Failed to re-initialize journal");
1186 assert_eq!(journal.size().await.unwrap(), 3);
1187 let buffer = context.encode();
1188 assert!(buffer.contains("tracked 2"));
1192 assert_eq!(journal.size().await.unwrap(), 3);
1193
1194 journal.destroy().await.unwrap();
1195 });
1196 }
1197
1198 #[test_traced]
1199 fn test_fixed_journal_recover_to_empty_from_partial_write() {
1200 let executor = deterministic::Runner::default();
1201 executor.start(|context| async move {
1202 let cfg = test_cfg(NZU64!(10));
1204 let mut journal = Journal::init(context.clone(), cfg.clone())
1205 .await
1206 .expect("failed to initialize journal");
1207 journal
1209 .append(test_digest(0))
1210 .await
1211 .expect("failed to append data");
1212 assert_eq!(journal.size().await.unwrap(), 1);
1213 journal.close().await.expect("Failed to close journal");
1214
1215 let (blob, size) = context
1217 .open(&cfg.partition, &0u64.to_be_bytes())
1218 .await
1219 .expect("Failed to open blob");
1220 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1222 blob.sync().await.expect("Failed to sync blob");
1223
1224 let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1226 .await
1227 .expect("Failed to re-initialize journal");
1228
1229 assert_eq!(journal.size().await.unwrap(), 0);
1232 assert_eq!(journal.oldest_retained_pos().await.unwrap(), None);
1233 journal
1235 .append(test_digest(0))
1236 .await
1237 .expect("failed to append data");
1238 assert_eq!(journal.size().await.unwrap(), 1);
1239
1240 journal.destroy().await.unwrap();
1241 });
1242 }
1243
1244 #[test_traced]
1245 fn test_fixed_journal_recover_from_unwritten_data() {
1246 let executor = deterministic::Runner::default();
1247 executor.start(|context| async move {
1248 let cfg = test_cfg(NZU64!(10));
1250 let mut journal = Journal::init(context.clone(), cfg.clone())
1251 .await
1252 .expect("failed to initialize journal");
1253
1254 journal
1256 .append(test_digest(0))
1257 .await
1258 .expect("failed to append data");
1259 assert_eq!(journal.size().await.unwrap(), 1);
1260 journal.close().await.expect("Failed to close journal");
1261
1262 let (blob, size) = context
1266 .open(&cfg.partition, &0u64.to_be_bytes())
1267 .await
1268 .expect("Failed to open blob");
1269 blob.write_at(vec![0u8; Digest::SIZE * 3 - 1], size)
1270 .await
1271 .expect("Failed to extend blob");
1272 blob.sync().await.expect("Failed to sync blob");
1273
1274 let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1276 .await
1277 .expect("Failed to re-initialize journal");
1278
1279 assert_eq!(journal.size().await.unwrap(), 1);
1281 assert_eq!(journal.oldest_retained_pos().await.unwrap(), Some(0));
1282
1283 journal
1285 .append(test_digest(1))
1286 .await
1287 .expect("failed to append data");
1288 assert_eq!(journal.size().await.unwrap(), 2);
1289
1290 let item = journal.read(0).await.unwrap();
1292 assert_eq!(item, test_digest(0));
1293
1294 let item = journal.read(1).await.unwrap();
1296 assert_eq!(item, test_digest(1));
1297
1298 journal.destroy().await.unwrap();
1299 });
1300 }
1301
1302 #[test_traced]
1303 fn test_fixed_journal_rewinding() {
1304 let executor = deterministic::Runner::default();
1305 executor.start(|context| async move {
1306 let cfg = test_cfg(NZU64!(2));
1308 let mut journal = Journal::init(context.clone(), cfg.clone())
1309 .await
1310 .expect("failed to initialize journal");
1311 assert!(matches!(journal.rewind(0).await, Ok(())));
1312 assert!(matches!(
1313 journal.rewind(1).await,
1314 Err(Error::InvalidRewind(1))
1315 ));
1316
1317 journal
1319 .append(test_digest(0))
1320 .await
1321 .expect("failed to append data 0");
1322 assert_eq!(journal.size().await.unwrap(), 1);
1323 assert!(matches!(journal.rewind(1).await, Ok(()))); assert!(matches!(journal.rewind(0).await, Ok(())));
1325 assert_eq!(journal.size().await.unwrap(), 0);
1326
1327 for i in 0..7 {
1329 let pos = journal
1330 .append(test_digest(i))
1331 .await
1332 .expect("failed to append data");
1333 assert_eq!(pos, i);
1334 }
1335 let buffer = context.encode();
1336 assert!(buffer.contains("tracked 4"));
1337 assert_eq!(journal.size().await.unwrap(), 7);
1338
1339 assert!(matches!(journal.rewind(4).await, Ok(())));
1341 assert_eq!(journal.size().await.unwrap(), 4);
1342 let buffer = context.encode();
1343 assert!(buffer.contains("tracked 3"));
1344
1345 assert!(matches!(journal.rewind(0).await, Ok(())));
1347 let buffer = context.encode();
1348 assert!(buffer.contains("tracked 1"));
1349 assert_eq!(journal.size().await.unwrap(), 0);
1350
1351 for _ in 0..10 {
1353 for i in 0..100 {
1354 journal
1355 .append(test_digest(i))
1356 .await
1357 .expect("failed to append data");
1358 }
1359 journal
1360 .rewind(journal.size().await.unwrap() - 49)
1361 .await
1362 .unwrap();
1363 }
1364 const ITEMS_REMAINING: u64 = 10 * (100 - 49);
1365 assert_eq!(journal.size().await.unwrap(), ITEMS_REMAINING);
1366
1367 journal.close().await.expect("Failed to close journal");
1368
1369 let mut cfg = test_cfg(NZU64!(3));
1371 cfg.partition = "test_partition_2".into();
1372 let mut journal = Journal::init(context.clone(), cfg.clone())
1373 .await
1374 .expect("failed to initialize journal");
1375 for _ in 0..10 {
1376 for i in 0..100 {
1377 journal
1378 .append(test_digest(i))
1379 .await
1380 .expect("failed to append data");
1381 }
1382 journal
1383 .rewind(journal.size().await.unwrap() - 49)
1384 .await
1385 .unwrap();
1386 }
1387 assert_eq!(journal.size().await.unwrap(), ITEMS_REMAINING);
1388
1389 journal.close().await.expect("Failed to close journal");
1390
1391 let mut journal: Journal<_, Digest> = Journal::init(context.clone(), cfg.clone())
1393 .await
1394 .expect("failed to re-initialize journal");
1395 assert_eq!(journal.size().await.unwrap(), 10 * (100 - 49));
1396
1397 journal.prune(300).await.expect("pruning failed");
1399 assert_eq!(journal.size().await.unwrap(), ITEMS_REMAINING);
1400 assert!(matches!(
1402 journal.rewind(299).await,
1403 Err(Error::InvalidRewind(299))
1404 ));
1405 assert!(matches!(journal.rewind(300).await, Ok(())));
1408 assert_eq!(journal.size().await.unwrap(), 300);
1409 assert_eq!(journal.oldest_retained_pos().await.unwrap(), None);
1410
1411 journal.destroy().await.unwrap();
1412 });
1413 }
1414
1415 #[test_traced]
1417 fn test_journal_conformance() {
1418 let executor = deterministic::Runner::default();
1420
1421 executor.start(|context| async move {
1423 let cfg = test_cfg(NZU64!(60));
1425
1426 let mut journal = Journal::init(context.clone(), cfg.clone())
1428 .await
1429 .expect("failed to initialize journal");
1430
1431 for i in 0..100 {
1433 journal
1434 .append(test_digest(i))
1435 .await
1436 .expect("Failed to append data");
1437 }
1438
1439 journal.close().await.expect("Failed to close journal");
1441
1442 let (blob, size) = context
1444 .open(&cfg.partition, &0u64.to_be_bytes())
1445 .await
1446 .expect("Failed to open blob");
1447 assert!(size > 0);
1448 let buf = blob
1449 .read_at(vec![0u8; size as usize], 0)
1450 .await
1451 .expect("Failed to read blob");
1452 let digest = Sha256::hash(buf.as_ref());
1453 assert_eq!(
1454 hex(&digest),
1455 "ed2ea67208cde2ee8c16cca5aa4f369f55b1402258c6b7760e5baf134e38944a",
1456 );
1457 blob.sync().await.expect("Failed to sync blob");
1458 let (blob, size) = context
1459 .open(&cfg.partition, &1u64.to_be_bytes())
1460 .await
1461 .expect("Failed to open blob");
1462 assert!(size > 0);
1463 let buf = blob
1464 .read_at(vec![0u8; size as usize], 0)
1465 .await
1466 .expect("Failed to read blob");
1467 let digest = Sha256::hash(buf.as_ref());
1468 assert_eq!(
1469 hex(&digest),
1470 "cc7efd4fc999aff36b9fd4213ba8da5810dc1849f92ae2ddf7c6dc40545f9aff",
1471 );
1472 blob.sync().await.expect("Failed to sync blob");
1473
1474 let journal = Journal::<Context, Digest>::init(context.clone(), cfg.clone())
1475 .await
1476 .expect("failed to initialize journal");
1477 journal.destroy().await.unwrap();
1478 });
1479 }
1480}