1use crate::journal::Error;
59use bytes::BufMut;
60use commonware_codec::{CodecFixed, DecodeExt, FixedSize};
61use commonware_runtime::{
62 buffer::{Append, PoolRef, Read},
63 Blob, Error as RError, Metrics, Storage,
64};
65use commonware_utils::hex;
66use futures::{
67 future::try_join_all,
68 stream::{self, Stream},
69 StreamExt,
70};
71use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
72use std::{
73 collections::BTreeMap,
74 marker::PhantomData,
75 num::{NonZeroU64, NonZeroUsize},
76};
77use tracing::{debug, trace, warn};
78
79#[derive(Clone)]
81pub struct Config {
82 pub partition: String,
84
85 pub items_per_blob: NonZeroU64,
90
91 pub buffer_pool: PoolRef,
93
94 pub write_buffer: NonZeroUsize,
96}
97
98pub struct Journal<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> {
100 pub(crate) context: E,
101 pub(crate) cfg: Config,
102
103 pub(crate) blobs: BTreeMap<u64, Append<E::Blob>>,
111
112 pub(crate) tail: Append<E::Blob>,
118
119 pub(crate) tail_index: u64,
121
122 pub(crate) tracked: Gauge,
123 pub(crate) synced: Counter,
124 pub(crate) pruned: Counter,
125
126 pub(crate) _array: PhantomData<A>,
127}
128
129impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> Journal<E, A> {
130 pub(crate) const CHUNK_SIZE: usize = u32::SIZE + A::SIZE;
131 pub(crate) const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
132
133 pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
145 let mut blobs = BTreeMap::new();
147 let stored_blobs = match context.scan(&cfg.partition).await {
148 Ok(blobs) => blobs,
149 Err(RError::PartitionMissing(_)) => Vec::new(),
150 Err(err) => return Err(Error::Runtime(err)),
151 };
152 for name in stored_blobs {
153 let (blob, size) = context
154 .open(&cfg.partition, &name)
155 .await
156 .map_err(Error::Runtime)?;
157 let index = match name.try_into() {
158 Ok(index) => u64::from_be_bytes(index),
159 Err(nm) => return Err(Error::InvalidBlobName(hex(&nm))),
160 };
161 debug!(blob = index, size, "loaded blob");
162 blobs.insert(index, (blob, size));
163 }
164
165 let full_size = cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64;
167 if !blobs.is_empty() {
168 let mut it = blobs.keys().rev();
169 let mut prev_index = *it.next().unwrap();
170 for index in it {
171 let (_, size) = blobs.get(index).unwrap();
172 if *index != prev_index - 1 {
173 return Err(Error::MissingBlob(prev_index - 1));
174 }
175 prev_index = *index;
176 if *size != full_size {
177 return Err(Error::InvalidBlobSize(*index, *size));
179 }
180 }
181 } else {
182 debug!("no blobs found");
183 let (blob, size) = context.open(&cfg.partition, &0u64.to_be_bytes()).await?;
184 assert_eq!(size, 0);
185 blobs.insert(0, (blob, size));
186 }
187
188 let tracked = Gauge::default();
190 let synced = Counter::default();
191 let pruned = Counter::default();
192 context.register("tracked", "Number of blobs", tracked.clone());
193 context.register("synced", "Number of syncs", synced.clone());
194 context.register("pruned", "Number of blobs pruned", pruned.clone());
195 tracked.set(blobs.len() as i64);
196
197 let (mut tail_index, (mut tail, mut tail_size)) = blobs.pop_last().unwrap();
199
200 tail_size = Self::trim_tail(&tail, tail_size, tail_index).await?;
202 if tail_size > full_size {
203 return Err(Error::InvalidBlobSize(tail_index, tail_size));
204 }
205
206 if tail_size == full_size {
209 warn!(
210 blob = tail_index,
211 "tail blob is full, creating a new empty one"
212 );
213 blobs.insert(tail_index, (tail, tail_size));
214 tail_index += 1;
215 (tail, tail_size) = context
216 .open(&cfg.partition, &tail_index.to_be_bytes())
217 .await?;
218 assert_eq!(tail_size, 0);
219 tracked.inc();
220 }
221
222 let blobs = try_join_all(blobs.into_iter().map(|(index, (blob, size))| {
226 let pool = cfg.buffer_pool.clone();
227 async move {
228 let blob = Append::new(blob, size, cfg.write_buffer, pool).await?;
229 Ok::<_, Error>((index, (blob, size)))
230 }
231 }))
232 .await?;
233 let tail = Append::new(tail, tail_size, cfg.write_buffer, cfg.buffer_pool.clone()).await?;
234
235 Ok(Self {
236 context,
237 cfg,
238 blobs: blobs
239 .into_iter()
240 .map(|(index, (blob, _))| (index, blob))
241 .collect(),
242 tail,
243 tail_index,
244 tracked,
245 synced,
246 pruned,
247 _array: PhantomData,
248 })
249 }
250
251 async fn trim_tail(
255 tail: &<E as Storage>::Blob,
256 mut tail_size: u64,
257 tail_index: u64,
258 ) -> Result<u64, Error> {
259 let mut truncated = false;
260 if !tail_size.is_multiple_of(Self::CHUNK_SIZE_U64) {
261 warn!(
262 blob = tail_index,
263 invalid_size = tail_size,
264 "last blob size is not a multiple of item size, truncating"
265 );
266 tail_size -= tail_size % Self::CHUNK_SIZE_U64;
267 tail.resize(tail_size).await?;
268 truncated = true;
269 }
270
271 while tail_size > 0 {
274 let offset = tail_size - Self::CHUNK_SIZE_U64;
275 let read = tail.read_at(vec![0u8; Self::CHUNK_SIZE], offset).await?;
276 match Self::verify_integrity(read.as_ref()) {
277 Ok(_) => break, Err(Error::ChecksumMismatch(_, _)) => {
279 warn!(blob = tail_index, offset, "checksum mismatch: truncating",);
280 tail_size -= Self::CHUNK_SIZE_U64;
281 tail.resize(tail_size).await?;
282 truncated = true;
283 }
284 Err(err) => return Err(err),
285 }
286 }
287
288 if truncated {
290 tail.sync().await?;
291 }
292
293 Ok(tail_size)
294 }
295
296 pub async fn sync(&mut self) -> Result<(), Error> {
298 self.synced.inc();
299 debug!(blob = self.tail_index, "syncing blob");
300 self.tail.sync().await.map_err(Error::Runtime)
301 }
302
303 pub async fn size(&self) -> u64 {
306 let size = self.tail.size().await;
307 assert_eq!(size % Self::CHUNK_SIZE_U64, 0);
308 let items_in_blob = size / Self::CHUNK_SIZE_U64;
309 items_in_blob + self.cfg.items_per_blob.get() * self.tail_index
310 }
311
312 pub async fn append(&mut self, item: A) -> Result<u64, Error> {
315 let mut size = self.tail.size().await;
317 assert!(size < self.cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64);
318 assert_eq!(size % Self::CHUNK_SIZE_U64, 0);
319 let mut buf: Vec<u8> = Vec::with_capacity(Self::CHUNK_SIZE);
320 let item = item.encode();
321 let checksum = crc32fast::hash(&item);
322 buf.extend_from_slice(&item);
323 buf.put_u32(checksum);
324
325 let item_pos =
327 (size / Self::CHUNK_SIZE_U64) + self.cfg.items_per_blob.get() * self.tail_index;
328 self.tail.append(buf).await?;
329 trace!(blob = self.tail_index, pos = item_pos, "appended item");
330 size += Self::CHUNK_SIZE_U64;
331
332 if size == self.cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64 {
335 self.tail.sync().await?;
338
339 let next_blob_index = self.tail_index + 1;
341 debug!(blob = next_blob_index, "creating next blob");
342 let (next_blob, size) = self
343 .context
344 .open(&self.cfg.partition, &next_blob_index.to_be_bytes())
345 .await?;
346 assert_eq!(size, 0);
347 let next_blob = Append::new(
348 next_blob,
349 size,
350 self.cfg.write_buffer,
351 self.cfg.buffer_pool.clone(),
352 )
353 .await?;
354 self.tracked.inc();
355
356 let old_tail = std::mem::replace(&mut self.tail, next_blob);
358 assert!(self.blobs.insert(self.tail_index, old_tail).is_none());
359 self.tail_index = next_blob_index;
360 }
361
362 Ok(item_pos)
363 }
364
365 pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
374 match size.cmp(&self.size().await) {
375 std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
376 std::cmp::Ordering::Equal => return Ok(()),
377 std::cmp::Ordering::Less => {}
378 }
379 let rewind_to_blob_index = size / self.cfg.items_per_blob;
380 if rewind_to_blob_index < self.oldest_blob_index() {
381 return Err(Error::InvalidRewind(size));
382 }
383 let rewind_to_offset = (size % self.cfg.items_per_blob) * Self::CHUNK_SIZE_U64;
384
385 while rewind_to_blob_index < self.tail_index {
388 let (blob_index, mut new_tail) = self.blobs.pop_last().unwrap();
389 assert_eq!(blob_index, self.tail_index - 1);
390 std::mem::swap(&mut self.tail, &mut new_tail);
391 self.remove_blob(self.tail_index, new_tail).await?;
392 self.tail_index -= 1;
393 }
394
395 self.tail.resize(rewind_to_offset).await?;
397
398 Ok(())
399 }
400
401 pub async fn oldest_retained_pos(&self) -> Result<Option<u64>, Error> {
405 let oldest_blob_index = self.oldest_blob_index();
406 if oldest_blob_index == self.tail_index && self.tail.size().await == 0 {
407 return Ok(None);
408 }
409
410 Ok(Some(oldest_blob_index * self.cfg.items_per_blob.get()))
412 }
413
414 pub async fn read(&self, pos: u64) -> Result<A, Error> {
421 let blob_index = pos / self.cfg.items_per_blob.get();
422 if blob_index > self.tail_index {
423 return Err(Error::ItemOutOfRange(pos));
424 }
425
426 let offset = (pos % self.cfg.items_per_blob.get()) * Self::CHUNK_SIZE_U64;
427
428 let blob = if blob_index == self.tail_index {
429 if offset >= self.tail.size().await {
430 return Err(Error::ItemOutOfRange(pos));
431 }
432 &self.tail
433 } else {
434 self.blobs.get(&blob_index).ok_or(Error::ItemPruned(pos))?
435 };
436
437 let read = blob.read_at(vec![0u8; Self::CHUNK_SIZE], offset).await?;
438 Self::verify_integrity(read.as_ref())
439 }
440
441 fn verify_integrity(buf: &[u8]) -> Result<A, Error> {
448 let stored_checksum = u32::from_be_bytes(buf[A::SIZE..].try_into().unwrap());
449 let checksum = crc32fast::hash(&buf[..A::SIZE]);
450 if checksum != stored_checksum {
451 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
452 }
453 A::decode(&buf[..A::SIZE]).map_err(Error::Codec)
454 }
455
456 pub async fn replay(
466 &self,
467 buffer: NonZeroUsize,
468 start_pos: u64,
469 ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + '_, Error> {
470 assert!(start_pos <= self.size().await);
471
472 let items_per_blob = self.cfg.items_per_blob.get();
474 let start_blob = start_pos / items_per_blob;
475 assert!(start_blob <= self.tail_index);
476 let blobs = self.blobs.range(start_blob..).collect::<Vec<_>>();
477 let full_size = items_per_blob * Self::CHUNK_SIZE_U64;
478 let mut blob_plus = blobs
479 .into_iter()
480 .map(|(blob_index, blob)| (*blob_index, blob.clone_blob(), full_size))
481 .collect::<Vec<_>>();
482
483 self.tail.sync().await?; let tail_size = self.tail.size().await;
486 blob_plus.push((self.tail_index, self.tail.clone_blob(), tail_size));
487 let start_offset = (start_pos % items_per_blob) * Self::CHUNK_SIZE_U64;
488
489 let stream = stream::iter(blob_plus).flat_map(move |(blob_index, blob, size)| {
492 let mut reader = Read::new(blob, size, buffer);
495 let buf = vec![0u8; Self::CHUNK_SIZE];
496 let initial_offset = if blob_index == start_blob {
497 reader.seek_to(start_offset).expect("invalid start_pos");
499 start_offset
500 } else {
501 0
502 };
503
504 stream::unfold(
505 (buf, reader, initial_offset),
506 move |(mut buf, mut reader, offset)| async move {
507 if offset >= reader.blob_size() {
508 return None;
509 }
510
511 let item_pos = items_per_blob * blob_index + offset / Self::CHUNK_SIZE_U64;
514 match reader.read_exact(&mut buf, Self::CHUNK_SIZE).await {
515 Ok(()) => {
516 let next_offset = offset + Self::CHUNK_SIZE_U64;
517 let result = Self::verify_integrity(&buf).map(|item| (item_pos, item));
518 if result.is_err() {
519 warn!("corrupted item at {item_pos}");
520 }
521 Some((result, (buf, reader, next_offset)))
522 }
523 Err(err) => {
524 warn!(
525 item_pos,
526 err = err.to_string(),
527 "error reading item during replay"
528 );
529 Some((Err(Error::Runtime(err)), (buf, reader, size)))
530 }
531 }
532 },
533 )
534 });
535
536 Ok(stream)
537 }
538
539 fn oldest_blob_index(&self) -> u64 {
541 if self.blobs.is_empty() {
542 self.tail_index
543 } else {
544 *self.blobs.first_key_value().unwrap().0
545 }
546 }
547
548 pub async fn prune(&mut self, min_item_pos: u64) -> Result<bool, Error> {
555 let oldest_blob_index = self.oldest_blob_index();
556 let new_oldest_blob =
557 std::cmp::min(min_item_pos / self.cfg.items_per_blob, self.tail_index);
558
559 let mut pruned = false;
560 for index in oldest_blob_index..new_oldest_blob {
561 pruned = true;
562 let blob = self.blobs.remove(&index).unwrap();
563 self.remove_blob(index, blob).await?;
564 self.pruned.inc();
565 }
566
567 Ok(pruned)
568 }
569
570 async fn remove_blob(&mut self, index: u64, blob: Append<E::Blob>) -> Result<(), Error> {
572 drop(blob);
573 self.context
574 .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
575 .await?;
576 debug!(blob = index, "removed blob");
577 self.tracked.dec();
578
579 Ok(())
580 }
581
582 pub async fn close(self) -> Result<(), Error> {
584 for (i, blob) in self.blobs.into_iter() {
585 blob.sync().await?;
586 debug!(blob = i, "synced blob");
587 }
588 self.tail.sync().await?;
589 debug!(blob = self.tail_index, "synced tail");
590
591 Ok(())
592 }
593
594 pub async fn destroy(self) -> Result<(), Error> {
596 for (i, blob) in self.blobs.into_iter() {
597 drop(blob);
598 debug!(blob = i, "destroyed blob");
599 self.context
600 .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
601 .await?;
602 }
603
604 drop(self.tail);
605 debug!(blob = self.tail_index, "destroyed blob");
606 self.context
607 .remove(&self.cfg.partition, Some(&self.tail_index.to_be_bytes()))
608 .await?;
609
610 match self.context.remove(&self.cfg.partition, None).await {
611 Ok(()) => {}
612 Err(RError::PartitionMissing(_)) => {
613 }
615 Err(err) => return Err(Error::Runtime(err)),
616 }
617
618 Ok(())
619 }
620}
621
622impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> super::Contiguous for Journal<E, A> {
624 type Item = A;
625
626 async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {
627 Journal::append(self, item).await
628 }
629
630 async fn size(&self) -> u64 {
631 Journal::size(self).await
632 }
633
634 async fn oldest_retained_pos(&self) -> Result<Option<u64>, Error> {
635 Journal::oldest_retained_pos(self).await
636 }
637
638 async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
639 Journal::prune(self, min_position).await
640 }
641
642 async fn replay(
643 &self,
644 start_pos: u64,
645 buffer: NonZeroUsize,
646 ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error> {
647 Journal::replay(self, buffer, start_pos).await
648 }
649
650 async fn read(&self, position: u64) -> Result<Self::Item, Error> {
651 Journal::read(self, position).await
652 }
653
654 async fn sync(&mut self) -> Result<(), Error> {
655 Journal::sync(self).await
656 }
657
658 async fn close(self) -> Result<(), Error> {
659 Journal::close(self).await
660 }
661
662 async fn destroy(self) -> Result<(), Error> {
663 Journal::destroy(self).await
664 }
665
666 async fn rewind(&mut self, size: u64) -> Result<(), Error> {
667 Journal::rewind(self, size).await
668 }
669}
670
671#[cfg(test)]
672mod tests {
673 use super::*;
674 use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
675 use commonware_macros::test_traced;
676 use commonware_runtime::{
677 deterministic::{self, Context},
678 Blob, Runner, Storage,
679 };
680 use commonware_utils::{NZUsize, NZU64};
681 use futures::{pin_mut, StreamExt};
682
683 const PAGE_SIZE: NonZeroUsize = NZUsize!(44);
684 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
685
686 fn test_digest(value: u64) -> Digest {
688 Sha256::hash(&value.to_be_bytes())
689 }
690
691 fn test_cfg(items_per_blob: NonZeroU64) -> Config {
692 Config {
693 partition: "test_partition".into(),
694 items_per_blob,
695 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
696 write_buffer: NZUsize!(2048),
697 }
698 }
699
700 #[test_traced]
701 fn test_fixed_journal_append_and_prune() {
702 let executor = deterministic::Runner::default();
704
705 executor.start(|context| async move {
707 let cfg = test_cfg(NZU64!(2));
709 let mut journal = Journal::init(context.clone(), cfg.clone())
710 .await
711 .expect("failed to initialize journal");
712
713 let buffer = context.encode();
714 assert!(buffer.contains("tracked 1"));
715
716 let mut pos = journal
718 .append(test_digest(0))
719 .await
720 .expect("failed to append data 0");
721 assert_eq!(pos, 0);
722
723 journal.close().await.expect("Failed to close journal");
725
726 let cfg = test_cfg(NZU64!(2));
728 let mut journal = Journal::init(context.clone(), cfg.clone())
729 .await
730 .expect("failed to re-initialize journal");
731
732 pos = journal
734 .append(test_digest(1))
735 .await
736 .expect("failed to append data 1");
737 assert_eq!(pos, 1);
738 pos = journal
739 .append(test_digest(2))
740 .await
741 .expect("failed to append data 2");
742 assert_eq!(pos, 2);
743 let buffer = context.encode();
744 assert!(buffer.contains("tracked 2"));
745
746 let item0 = journal.read(0).await.expect("failed to read data 0");
748 assert_eq!(item0, test_digest(0));
749 let item1 = journal.read(1).await.expect("failed to read data 1");
750 assert_eq!(item1, test_digest(1));
751 let item2 = journal.read(2).await.expect("failed to read data 2");
752 assert_eq!(item2, test_digest(2));
753 let err = journal.read(3).await.expect_err("expected read to fail");
754 assert!(matches!(err, Error::ItemOutOfRange(3)));
755
756 journal.sync().await.expect("failed to sync journal");
758 let buffer = context.encode();
759 assert!(buffer.contains("synced_total 1"));
760
761 journal.prune(1).await.expect("failed to prune journal 1");
763 let buffer = context.encode();
764 assert!(buffer.contains("tracked 2"));
765
766 journal.prune(2).await.expect("failed to prune journal 2");
768 assert_eq!(journal.oldest_retained_pos().await.unwrap(), Some(2));
769 let buffer = context.encode();
770 assert!(buffer.contains("tracked 1"));
771 assert!(buffer.contains("pruned_total 1"));
772
773 let result0 = journal.read(0).await;
775 assert!(matches!(result0, Err(Error::ItemPruned(0))));
776 let result1 = journal.read(1).await;
777 assert!(matches!(result1, Err(Error::ItemPruned(1))));
778
779 let result2 = journal.read(2).await.unwrap();
781 assert_eq!(result2, test_digest(2));
782
783 for i in 3..10 {
785 let pos = journal
786 .append(test_digest(i))
787 .await
788 .expect("failed to append data");
789 assert_eq!(pos, i);
790 }
791
792 journal.prune(0).await.expect("no-op pruning failed");
794 assert_eq!(journal.oldest_blob_index(), 1);
795 assert_eq!(journal.tail_index, 5);
796 assert_eq!(journal.oldest_retained_pos().await.unwrap(), Some(2));
797
798 journal
800 .prune(3 * cfg.items_per_blob.get())
801 .await
802 .expect("failed to prune journal 2");
803 assert_eq!(journal.oldest_retained_pos().await.unwrap(), Some(6));
804 let buffer = context.encode();
805 assert_eq!(journal.oldest_blob_index(), 3);
806 assert_eq!(journal.tail_index, 5);
807 assert!(buffer.contains("tracked 3"));
808 assert!(buffer.contains("pruned_total 3"));
809
810 journal
812 .prune(10000)
813 .await
814 .expect("failed to max-prune journal");
815 let buffer = context.encode();
816 let size = journal.size().await;
817 assert_eq!(size, 10);
818 assert_eq!(journal.oldest_blob_index(), 5);
819 assert_eq!(journal.tail_index, 5);
820 assert!(buffer.contains("tracked 1"));
821 assert!(buffer.contains("pruned_total 5"));
822 assert_eq!(journal.oldest_retained_pos().await.unwrap(), None);
825
826 {
827 let stream = journal
828 .replay(NZUsize!(1024), 0)
829 .await
830 .expect("failed to replay journal");
831 pin_mut!(stream);
832 let mut items = Vec::new();
833 while let Some(result) = stream.next().await {
834 match result {
835 Ok((pos, item)) => {
836 assert_eq!(test_digest(pos), item);
837 items.push(pos);
838 }
839 Err(err) => panic!("Failed to read item: {err}"),
840 }
841 }
842 assert_eq!(items, Vec::<u64>::new());
843 }
844
845 journal.destroy().await.unwrap();
846 });
847 }
848
849 #[test_traced]
851 fn test_fixed_journal_append_a_lot_of_data() {
852 let executor = deterministic::Runner::default();
854 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10000);
855 executor.start(|context| async move {
856 let cfg = test_cfg(ITEMS_PER_BLOB);
857 let mut journal = Journal::init(context.clone(), cfg.clone())
858 .await
859 .expect("failed to initialize journal");
860 for i in 0u64..ITEMS_PER_BLOB.get() * 2 - 1 {
862 journal
863 .append(test_digest(i))
864 .await
865 .expect("failed to append data");
866 }
867 journal.close().await.expect("failed to close journal");
869 let journal = Journal::init(context.clone(), cfg.clone())
870 .await
871 .expect("failed to re-initialize journal");
872 for i in 0u64..10000 {
873 let item: Digest = journal.read(i).await.expect("failed to read data");
874 assert_eq!(item, test_digest(i));
875 }
876 journal.destroy().await.expect("failed to destroy journal");
877 });
878 }
879
880 #[test_traced]
881 fn test_fixed_journal_replay() {
882 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
883 let executor = deterministic::Runner::default();
885
886 executor.start(|context| async move {
888 let cfg = test_cfg(ITEMS_PER_BLOB);
890 let mut journal = Journal::init(context.clone(), cfg.clone())
891 .await
892 .expect("failed to initialize journal");
893
894 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
896 let pos = journal
897 .append(test_digest(i))
898 .await
899 .expect("failed to append data");
900 assert_eq!(pos, i);
901 }
902
903 let buffer = context.encode();
904 assert!(buffer.contains("tracked 101"));
905
906 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
908 let item: Digest = journal.read(i).await.expect("failed to read data");
909 assert_eq!(item, test_digest(i), "i={i}");
910 }
911
912 {
914 let stream = journal
915 .replay(NZUsize!(1024), 0)
916 .await
917 .expect("failed to replay journal");
918 let mut items = Vec::new();
919 pin_mut!(stream);
920 while let Some(result) = stream.next().await {
921 match result {
922 Ok((pos, item)) => {
923 assert_eq!(test_digest(pos), item, "pos={pos}, item={item:?}");
924 items.push(pos);
925 }
926 Err(err) => panic!("Failed to read item: {err}"),
927 }
928 }
929
930 assert_eq!(
932 items.len(),
933 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
934 );
935 items.sort();
936 for (i, pos) in items.iter().enumerate() {
937 assert_eq!(i as u64, *pos);
938 }
939 }
940 journal.close().await.expect("Failed to close journal");
941
942 let checksum_offset = Digest::SIZE as u64
944 + (ITEMS_PER_BLOB.get() / 2) * (Digest::SIZE + u32::SIZE) as u64;
945 let (blob, _) = context
946 .open(&cfg.partition, &40u64.to_be_bytes())
947 .await
948 .expect("Failed to open blob");
949 let bad_checksum = 123456789u32;
951 blob.write_at(bad_checksum.to_be_bytes().to_vec(), checksum_offset)
952 .await
953 .expect("Failed to write incorrect checksum");
954 let corrupted_item_pos = 40 * ITEMS_PER_BLOB.get() + ITEMS_PER_BLOB.get() / 2;
955 blob.sync().await.expect("Failed to sync blob");
956
957 let journal = Journal::init(context.clone(), cfg.clone())
959 .await
960 .expect("Failed to re-initialize journal");
961
962 let err = journal.read(corrupted_item_pos).await.unwrap_err();
964 assert!(matches!(err, Error::ChecksumMismatch(x, _) if x == bad_checksum));
965
966 {
968 let stream = journal
969 .replay(NZUsize!(1024), 0)
970 .await
971 .expect("failed to replay journal");
972 let mut items = Vec::new();
973 pin_mut!(stream);
974 let mut error_count = 0;
975 while let Some(result) = stream.next().await {
976 match result {
977 Ok((pos, item)) => {
978 assert_eq!(test_digest(pos), item);
979 items.push(pos);
980 }
981 Err(err) => {
982 error_count += 1;
983 assert!(matches!(err, Error::ChecksumMismatch(_, _)));
984 }
985 }
986 }
987 assert_eq!(error_count, 1);
988 assert_eq!(
990 items.len(),
991 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2 - 1
992 );
993 }
994 journal.close().await.expect("Failed to close journal");
995 });
996 }
997
998 #[test_traced]
999 fn test_fixed_journal_init_with_corrupted_historical_blobs() {
1000 let executor = deterministic::Runner::default();
1002 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1004 executor.start(|context| async move {
1005 let cfg = test_cfg(ITEMS_PER_BLOB);
1007 let mut journal = Journal::init(context.clone(), cfg.clone())
1008 .await
1009 .expect("failed to initialize journal");
1010
1011 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1013 let pos = journal
1014 .append(test_digest(i))
1015 .await
1016 .expect("failed to append data");
1017 assert_eq!(pos, i);
1018 }
1019 journal.close().await.expect("Failed to close journal");
1020
1021 let buffer = context.encode();
1022 assert!(buffer.contains("tracked 101"));
1023
1024 let (blob, size) = context
1026 .open(&cfg.partition, &40u64.to_be_bytes())
1027 .await
1028 .expect("Failed to open blob");
1029 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1030 blob.sync().await.expect("Failed to sync blob");
1031 let result = Journal::<_, Digest>::init(context.clone(), cfg.clone()).await;
1032 assert!(matches!(
1033 result.err().unwrap(),
1034 Error::InvalidBlobSize(_, _)
1035 ));
1036
1037 context
1039 .remove(&cfg.partition, Some(&40u64.to_be_bytes()))
1040 .await
1041 .expect("Failed to remove blob");
1042 let result = Journal::<_, Digest>::init(context.clone(), cfg.clone()).await;
1043 assert!(matches!(result.err().unwrap(), Error::MissingBlob(n) if n == 40));
1044 });
1045 }
1046
1047 #[test_traced]
1048 fn test_fixed_journal_test_trim_blob() {
1049 let executor = deterministic::Runner::default();
1051 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1053 executor.start(|context| async move {
1054 let cfg = test_cfg(ITEMS_PER_BLOB);
1056 let mut journal = Journal::init(context.clone(), cfg.clone())
1057 .await
1058 .expect("failed to initialize journal");
1059
1060 let item_count = ITEMS_PER_BLOB.get() + 3;
1062 for i in 0u64..item_count {
1063 journal
1064 .append(test_digest(i))
1065 .await
1066 .expect("failed to append data");
1067 }
1068 assert_eq!(journal.size().await, item_count);
1069 journal.close().await.expect("Failed to close journal");
1070
1071 let (blob, size) = context
1074 .open(&cfg.partition, &1u64.to_be_bytes())
1075 .await
1076 .expect("Failed to open blob");
1077 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1078
1079 let checksum_offset = Digest::SIZE + u32::SIZE + Digest::SIZE;
1082
1083 let bad_checksum = 123456789u32;
1084 blob.write_at(bad_checksum.to_be_bytes().to_vec(), checksum_offset as u64)
1085 .await
1086 .expect("Failed to write incorrect checksum");
1087 blob.sync().await.expect("Failed to sync blob");
1088
1089 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1090 .await
1091 .unwrap();
1092
1093 assert_eq!(journal.size().await, item_count - 2);
1095
1096 let (blob, size) = context
1098 .open(&cfg.partition, &1u64.to_be_bytes())
1099 .await
1100 .expect("Failed to open blob");
1101 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1102 blob.sync().await.expect("Failed to sync blob");
1103
1104 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1105 .await
1106 .unwrap();
1107
1108 assert_eq!(journal.size().await, item_count - 3);
1110
1111 journal.destroy().await.expect("Failed to destroy journal");
1113 });
1114 }
1115
1116 #[test_traced]
1117 fn test_fixed_journal_partial_replay() {
1118 const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1119 const START_POS: u64 = 53;
1122
1123 let executor = deterministic::Runner::default();
1125 executor.start(|context| async move {
1127 let cfg = test_cfg(ITEMS_PER_BLOB);
1129 let mut journal = Journal::init(context.clone(), cfg.clone())
1130 .await
1131 .expect("failed to initialize journal");
1132
1133 for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1135 let pos = journal
1136 .append(test_digest(i))
1137 .await
1138 .expect("failed to append data");
1139 assert_eq!(pos, i);
1140 }
1141
1142 let buffer = context.encode();
1143 assert!(buffer.contains("tracked 101"));
1144
1145 {
1147 let stream = journal
1148 .replay(NZUsize!(1024), START_POS)
1149 .await
1150 .expect("failed to replay journal");
1151 let mut items = Vec::new();
1152 pin_mut!(stream);
1153 while let Some(result) = stream.next().await {
1154 match result {
1155 Ok((pos, item)) => {
1156 assert!(pos >= START_POS, "pos={pos}");
1157 assert_eq!(
1158 test_digest(pos),
1159 item,
1160 "Item at position {pos} did not match expected digest"
1161 );
1162 items.push(pos);
1163 }
1164 Err(err) => panic!("Failed to read item: {err}"),
1165 }
1166 }
1167
1168 assert_eq!(
1170 items.len(),
1171 ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1172 - START_POS as usize
1173 );
1174 items.sort();
1175 for (i, pos) in items.iter().enumerate() {
1176 assert_eq!(i as u64, *pos - START_POS);
1177 }
1178 }
1179
1180 journal.destroy().await.unwrap();
1181 });
1182 }
1183
1184 #[test_traced]
1185 fn test_fixed_journal_recover_from_partial_write() {
1186 let executor = deterministic::Runner::default();
1188
1189 executor.start(|context| async move {
1191 let cfg = test_cfg(NZU64!(3));
1193 let mut journal = Journal::init(context.clone(), cfg.clone())
1194 .await
1195 .expect("failed to initialize journal");
1196 for i in 0..5 {
1197 journal
1198 .append(test_digest(i))
1199 .await
1200 .expect("failed to append data");
1201 }
1202 assert_eq!(journal.size().await, 5);
1203 let buffer = context.encode();
1204 assert!(buffer.contains("tracked 2"));
1205 journal.close().await.expect("Failed to close journal");
1206
1207 let (blob, size) = context
1209 .open(&cfg.partition, &1u64.to_be_bytes())
1210 .await
1211 .expect("Failed to open blob");
1212 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1214 blob.sync().await.expect("Failed to sync blob");
1215
1216 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1218 .await
1219 .expect("Failed to re-initialize journal");
1220 assert_eq!(journal.size().await, 4);
1222 let buffer = context.encode();
1223 assert!(buffer.contains("tracked 2"));
1224 journal.close().await.expect("Failed to close journal");
1225
1226 context
1229 .remove(&cfg.partition, Some(&1u64.to_be_bytes()))
1230 .await
1231 .expect("Failed to remove blob");
1232 let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1233 .await
1234 .expect("Failed to re-initialize journal");
1235 assert_eq!(journal.size().await, 3);
1236 let buffer = context.encode();
1237 assert!(buffer.contains("tracked 2"));
1241 assert_eq!(journal.size().await, 3);
1242
1243 journal.destroy().await.unwrap();
1244 });
1245 }
1246
1247 #[test_traced]
1248 fn test_fixed_journal_recover_to_empty_from_partial_write() {
1249 let executor = deterministic::Runner::default();
1250 executor.start(|context| async move {
1251 let cfg = test_cfg(NZU64!(10));
1253 let mut journal = Journal::init(context.clone(), cfg.clone())
1254 .await
1255 .expect("failed to initialize journal");
1256 journal
1258 .append(test_digest(0))
1259 .await
1260 .expect("failed to append data");
1261 assert_eq!(journal.size().await, 1);
1262 journal.close().await.expect("Failed to close journal");
1263
1264 let (blob, size) = context
1266 .open(&cfg.partition, &0u64.to_be_bytes())
1267 .await
1268 .expect("Failed to open blob");
1269 blob.resize(size - 1).await.expect("Failed to corrupt blob");
1271 blob.sync().await.expect("Failed to sync blob");
1272
1273 let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1275 .await
1276 .expect("Failed to re-initialize journal");
1277
1278 assert_eq!(journal.size().await, 0);
1281 assert_eq!(journal.oldest_retained_pos().await.unwrap(), None);
1282 journal
1284 .append(test_digest(0))
1285 .await
1286 .expect("failed to append data");
1287 assert_eq!(journal.size().await, 1);
1288
1289 journal.destroy().await.unwrap();
1290 });
1291 }
1292
1293 #[test_traced]
1294 fn test_fixed_journal_recover_from_unwritten_data() {
1295 let executor = deterministic::Runner::default();
1296 executor.start(|context| async move {
1297 let cfg = test_cfg(NZU64!(10));
1299 let mut journal = Journal::init(context.clone(), cfg.clone())
1300 .await
1301 .expect("failed to initialize journal");
1302
1303 journal
1305 .append(test_digest(0))
1306 .await
1307 .expect("failed to append data");
1308 assert_eq!(journal.size().await, 1);
1309 journal.close().await.expect("Failed to close journal");
1310
1311 let (blob, size) = context
1315 .open(&cfg.partition, &0u64.to_be_bytes())
1316 .await
1317 .expect("Failed to open blob");
1318 blob.write_at(vec![0u8; Digest::SIZE * 3 - 1], size)
1319 .await
1320 .expect("Failed to extend blob");
1321 blob.sync().await.expect("Failed to sync blob");
1322
1323 let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1325 .await
1326 .expect("Failed to re-initialize journal");
1327
1328 assert_eq!(journal.size().await, 1);
1330 assert_eq!(journal.oldest_retained_pos().await.unwrap(), Some(0));
1331
1332 journal
1334 .append(test_digest(1))
1335 .await
1336 .expect("failed to append data");
1337 assert_eq!(journal.size().await, 2);
1338
1339 let item = journal.read(0).await.unwrap();
1341 assert_eq!(item, test_digest(0));
1342
1343 let item = journal.read(1).await.unwrap();
1345 assert_eq!(item, test_digest(1));
1346
1347 journal.destroy().await.unwrap();
1348 });
1349 }
1350
1351 #[test_traced]
1352 fn test_fixed_journal_rewinding() {
1353 let executor = deterministic::Runner::default();
1354 executor.start(|context| async move {
1355 let cfg = test_cfg(NZU64!(2));
1357 let mut journal = Journal::init(context.clone(), cfg.clone())
1358 .await
1359 .expect("failed to initialize journal");
1360 assert!(matches!(journal.rewind(0).await, Ok(())));
1361 assert!(matches!(
1362 journal.rewind(1).await,
1363 Err(Error::InvalidRewind(1))
1364 ));
1365
1366 journal
1368 .append(test_digest(0))
1369 .await
1370 .expect("failed to append data 0");
1371 assert_eq!(journal.size().await, 1);
1372 assert!(matches!(journal.rewind(1).await, Ok(()))); assert!(matches!(journal.rewind(0).await, Ok(())));
1374 assert_eq!(journal.size().await, 0);
1375
1376 for i in 0..7 {
1378 let pos = journal
1379 .append(test_digest(i))
1380 .await
1381 .expect("failed to append data");
1382 assert_eq!(pos, i);
1383 }
1384 let buffer = context.encode();
1385 assert!(buffer.contains("tracked 4"));
1386 assert_eq!(journal.size().await, 7);
1387
1388 assert!(matches!(journal.rewind(4).await, Ok(())));
1390 assert_eq!(journal.size().await, 4);
1391 let buffer = context.encode();
1392 assert!(buffer.contains("tracked 3"));
1393
1394 assert!(matches!(journal.rewind(0).await, Ok(())));
1396 let buffer = context.encode();
1397 assert!(buffer.contains("tracked 1"));
1398 assert_eq!(journal.size().await, 0);
1399
1400 for _ in 0..10 {
1402 for i in 0..100 {
1403 journal
1404 .append(test_digest(i))
1405 .await
1406 .expect("failed to append data");
1407 }
1408 journal.rewind(journal.size().await - 49).await.unwrap();
1409 }
1410 const ITEMS_REMAINING: u64 = 10 * (100 - 49);
1411 assert_eq!(journal.size().await, ITEMS_REMAINING);
1412
1413 journal.close().await.expect("Failed to close journal");
1414
1415 let mut cfg = test_cfg(NZU64!(3));
1417 cfg.partition = "test_partition_2".into();
1418 let mut journal = Journal::init(context.clone(), cfg.clone())
1419 .await
1420 .expect("failed to initialize journal");
1421 for _ in 0..10 {
1422 for i in 0..100 {
1423 journal
1424 .append(test_digest(i))
1425 .await
1426 .expect("failed to append data");
1427 }
1428 journal.rewind(journal.size().await - 49).await.unwrap();
1429 }
1430 assert_eq!(journal.size().await, ITEMS_REMAINING);
1431
1432 journal.close().await.expect("Failed to close journal");
1433
1434 let mut journal: Journal<_, Digest> = Journal::init(context.clone(), cfg.clone())
1436 .await
1437 .expect("failed to re-initialize journal");
1438 assert_eq!(journal.size().await, 10 * (100 - 49));
1439
1440 journal.prune(300).await.expect("pruning failed");
1442 assert_eq!(journal.size().await, ITEMS_REMAINING);
1443 assert!(matches!(
1445 journal.rewind(299).await,
1446 Err(Error::InvalidRewind(299))
1447 ));
1448 assert!(matches!(journal.rewind(300).await, Ok(())));
1451 assert_eq!(journal.size().await, 300);
1452 assert_eq!(journal.oldest_retained_pos().await.unwrap(), None);
1453
1454 journal.destroy().await.unwrap();
1455 });
1456 }
1457
1458 #[test_traced]
1460 fn test_journal_conformance() {
1461 let executor = deterministic::Runner::default();
1463
1464 executor.start(|context| async move {
1466 let cfg = test_cfg(NZU64!(60));
1468
1469 let mut journal = Journal::init(context.clone(), cfg.clone())
1471 .await
1472 .expect("failed to initialize journal");
1473
1474 for i in 0..100 {
1476 journal
1477 .append(test_digest(i))
1478 .await
1479 .expect("Failed to append data");
1480 }
1481
1482 journal.close().await.expect("Failed to close journal");
1484
1485 let (blob, size) = context
1487 .open(&cfg.partition, &0u64.to_be_bytes())
1488 .await
1489 .expect("Failed to open blob");
1490 assert!(size > 0);
1491 let buf = blob
1492 .read_at(vec![0u8; size as usize], 0)
1493 .await
1494 .expect("Failed to read blob");
1495 let digest = Sha256::hash(buf.as_ref());
1496 assert_eq!(
1497 hex(&digest),
1498 "ed2ea67208cde2ee8c16cca5aa4f369f55b1402258c6b7760e5baf134e38944a",
1499 );
1500 blob.sync().await.expect("Failed to sync blob");
1501 let (blob, size) = context
1502 .open(&cfg.partition, &1u64.to_be_bytes())
1503 .await
1504 .expect("Failed to open blob");
1505 assert!(size > 0);
1506 let buf = blob
1507 .read_at(vec![0u8; size as usize], 0)
1508 .await
1509 .expect("Failed to read blob");
1510 let digest = Sha256::hash(buf.as_ref());
1511 assert_eq!(
1512 hex(&digest),
1513 "cc7efd4fc999aff36b9fd4213ba8da5810dc1849f92ae2ddf7c6dc40545f9aff",
1514 );
1515 blob.sync().await.expect("Failed to sync blob");
1516
1517 let journal = Journal::<Context, Digest>::init(context.clone(), cfg.clone())
1518 .await
1519 .expect("failed to initialize journal");
1520 journal.destroy().await.unwrap();
1521 });
1522 }
1523}