commonware_storage/journal/contiguous/
fixed.rs

1//! An append-only log for storing fixed length items on disk.
2//!
3//! In addition to replay, stored items can be fetched directly by their `position` in the journal,
4//! where position is defined as the item's order of insertion starting from 0, unaffected by
5//! pruning.
6//!
7//! _See [super::variable] for a journal that supports variable length items._
8//!
9//! # Format
10//!
11//! Data stored in a `fixed::Journal` is persisted in one of many Blobs within a caller-provided
12//! `partition`. Each `Blob` contains a configurable maximum of `items_per_blob`, with each item
13//! followed by its checksum (CRC32):
14//!
15//! ```text
16//! +--------+-----------+--------+-----------+--------+----------+-------------+
17//! | item_0 | C(Item_0) | item_1 | C(Item_1) |   ...  | item_n-1 | C(Item_n-1) |
18//! +--------+-----------+--------+----0------+--------+----------+-------------+
19//!
20//! n = config.items_per_blob, C = CRC32
21//! ```
22//!
23//! The most recent blob may not necessarily be full, in which case it will contain fewer than the
24//! maximum number of items.
25//!
26//! A fetched or replayed item's checksum is always computed and checked against the stored value
27//! before it is returned. If the checksums do not match, an error is returned instead.
28//!
29//! # Open Blobs
30//!
31//! All `Blobs` in a given `partition` are kept open during the lifetime of `Journal`. You can limit
32//! the number of open blobs by using a higher number of `items_per_blob` or pruning old items.
33//!
34//! # Consistency
35//!
36//! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller
37//! to determine when to force pending data to be written to `Storage` using the `sync` method. When
38//! calling `close`, all pending data is automatically synced and any open blobs are closed.
39//!
40//! # Pruning
41//!
42//! The `prune` method allows the `Journal` to prune blobs consisting entirely of items prior to a
43//! given point in history.
44//!
45//! # State Sync
46//!
47//! `Journal::init_sync` allows for initializing a journal for use in state sync.
48//! When opened in this mode, we attempt to populate the journal within the given range
49//! with persisted data.
50//! If the journal is empty, we create a fresh journal at the specified position.
51//! If the journal is not empty, we prune the journal to the specified lower bound and rewind to
52//! the specified upper bound.
53//!
54//! # Replay
55//!
56//! The `replay` method supports fast reading of all unpruned items into memory.
57
58use crate::journal::Error;
59use bytes::BufMut;
60use commonware_codec::{CodecFixed, DecodeExt, FixedSize};
61use commonware_runtime::{
62    buffer::{Append, PoolRef, Read},
63    Blob, Error as RError, Metrics, Storage,
64};
65use commonware_utils::hex;
66use futures::{
67    future::try_join_all,
68    stream::{self, Stream},
69    StreamExt,
70};
71use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
72use std::{
73    collections::BTreeMap,
74    marker::PhantomData,
75    num::{NonZeroU64, NonZeroUsize},
76};
77use tracing::{debug, trace, warn};
78
79/// Configuration for `Journal` storage.
80#[derive(Clone)]
81pub struct Config {
82    /// The `commonware-runtime::Storage` partition to use for storing journal blobs.
83    pub partition: String,
84
85    /// The maximum number of journal items to store in each blob.
86    ///
87    /// Any unpruned historical blobs will contain exactly this number of items.
88    /// Only the newest blob may contain fewer items.
89    pub items_per_blob: NonZeroU64,
90
91    /// The buffer pool to use for caching data.
92    pub buffer_pool: PoolRef,
93
94    /// The size of the write buffer to use for each blob.
95    pub write_buffer: NonZeroUsize,
96}
97
98/// Implementation of `Journal` storage.
99pub struct Journal<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> {
100    pub(crate) context: E,
101    pub(crate) cfg: Config,
102
103    /// Stores the historical blobs. A BTreeMap allows iterating over them from oldest to newest.
104    ///
105    /// # Invariants
106    ///
107    /// - Indices are consecutive and without gaps.
108    /// - Contains only full blobs.
109    /// - Never contains the most recent blob.
110    pub(crate) blobs: BTreeMap<u64, Append<E::Blob>>,
111
112    /// The most recent blob.
113    ///
114    /// # Invariant
115    ///
116    /// Always has room for at least one more item (and may be empty).
117    pub(crate) tail: Append<E::Blob>,
118
119    /// The index of the most recent blob.
120    pub(crate) tail_index: u64,
121
122    pub(crate) tracked: Gauge,
123    pub(crate) synced: Counter,
124    pub(crate) pruned: Counter,
125
126    pub(crate) _array: PhantomData<A>,
127}
128
129impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> Journal<E, A> {
130    pub(crate) const CHUNK_SIZE: usize = u32::SIZE + A::SIZE;
131    pub(crate) const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64;
132
133    /// Initialize a new `Journal` instance.
134    ///
135    /// All backing blobs are opened but not read during initialization. The `replay` method can be
136    /// used to iterate over all items in the `Journal`.
137    ///
138    /// # Repair
139    ///
140    /// Like [sqlite](https://github.com/sqlite/sqlite/blob/8658a8df59f00ec8fcfea336a2a6a4b5ef79d2ee/src/wal.c#L1504-L1505)
141    /// and [rocksdb](https://github.com/facebook/rocksdb/blob/0c533e61bc6d89fdf1295e8e0bcee4edb3aef401/include/rocksdb/options.h#L441-L445),
142    /// the first invalid data read will be considered the new end of the journal (and the underlying [Blob] will be truncated to the last
143    /// valid item).
144    pub async fn init(context: E, cfg: Config) -> Result<Self, Error> {
145        // Iterate over blobs in partition
146        let mut blobs = BTreeMap::new();
147        let stored_blobs = match context.scan(&cfg.partition).await {
148            Ok(blobs) => blobs,
149            Err(RError::PartitionMissing(_)) => Vec::new(),
150            Err(err) => return Err(Error::Runtime(err)),
151        };
152        for name in stored_blobs {
153            let (blob, size) = context
154                .open(&cfg.partition, &name)
155                .await
156                .map_err(Error::Runtime)?;
157            let index = match name.try_into() {
158                Ok(index) => u64::from_be_bytes(index),
159                Err(nm) => return Err(Error::InvalidBlobName(hex(&nm))),
160            };
161            debug!(blob = index, size, "loaded blob");
162            blobs.insert(index, (blob, size));
163        }
164
165        // Check that there are no gaps in the historical blobs and that they are all full.
166        let full_size = cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64;
167        if !blobs.is_empty() {
168            let mut it = blobs.keys().rev();
169            let mut prev_index = *it.next().unwrap();
170            for index in it {
171                let (_, size) = blobs.get(index).unwrap();
172                if *index != prev_index - 1 {
173                    return Err(Error::MissingBlob(prev_index - 1));
174                }
175                prev_index = *index;
176                if *size != full_size {
177                    // Non-final blobs that have invalid sizes are not recoverable.
178                    return Err(Error::InvalidBlobSize(*index, *size));
179                }
180            }
181        } else {
182            debug!("no blobs found");
183            let (blob, size) = context.open(&cfg.partition, &0u64.to_be_bytes()).await?;
184            assert_eq!(size, 0);
185            blobs.insert(0, (blob, size));
186        }
187
188        // Initialize metrics.
189        let tracked = Gauge::default();
190        let synced = Counter::default();
191        let pruned = Counter::default();
192        context.register("tracked", "Number of blobs", tracked.clone());
193        context.register("synced", "Number of syncs", synced.clone());
194        context.register("pruned", "Number of blobs pruned", pruned.clone());
195        tracked.set(blobs.len() as i64);
196
197        // Initialize the tail blob.
198        let (mut tail_index, (mut tail, mut tail_size)) = blobs.pop_last().unwrap();
199
200        // Trim invalid items from the tail blob.
201        tail_size = Self::trim_tail(&tail, tail_size, tail_index).await?;
202        if tail_size > full_size {
203            return Err(Error::InvalidBlobSize(tail_index, tail_size));
204        }
205
206        // If the tail blob is full we need to start a new one to maintain its invariant that there
207        // is always room for another item.
208        if tail_size == full_size {
209            warn!(
210                blob = tail_index,
211                "tail blob is full, creating a new empty one"
212            );
213            blobs.insert(tail_index, (tail, tail_size));
214            tail_index += 1;
215            (tail, tail_size) = context
216                .open(&cfg.partition, &tail_index.to_be_bytes())
217                .await?;
218            assert_eq!(tail_size, 0);
219            tracked.inc();
220        }
221
222        // Wrap all blobs with Append wrappers.
223        // TODO(https://github.com/commonwarexyz/monorepo/issues/1219): Consider creating an
224        // Immutable wrapper which doesn't allocate a write buffer for these.
225        let blobs = try_join_all(blobs.into_iter().map(|(index, (blob, size))| {
226            let pool = cfg.buffer_pool.clone();
227            async move {
228                let blob = Append::new(blob, size, cfg.write_buffer, pool).await?;
229                Ok::<_, Error>((index, (blob, size)))
230            }
231        }))
232        .await?;
233        let tail = Append::new(tail, tail_size, cfg.write_buffer, cfg.buffer_pool.clone()).await?;
234
235        Ok(Self {
236            context,
237            cfg,
238            blobs: blobs
239                .into_iter()
240                .map(|(index, (blob, _))| (index, blob))
241                .collect(),
242            tail,
243            tail_index,
244            tracked,
245            synced,
246            pruned,
247            _array: PhantomData,
248        })
249    }
250
251    /// Trim any invalid data found at the end of the tail blob and return the new size. The new
252    /// size will be less than or equal to the originally provided size, and a multiple of the item
253    /// size.
254    async fn trim_tail(
255        tail: &<E as Storage>::Blob,
256        mut tail_size: u64,
257        tail_index: u64,
258    ) -> Result<u64, Error> {
259        let mut truncated = false;
260        if !tail_size.is_multiple_of(Self::CHUNK_SIZE_U64) {
261            warn!(
262                blob = tail_index,
263                invalid_size = tail_size,
264                "last blob size is not a multiple of item size, truncating"
265            );
266            tail_size -= tail_size % Self::CHUNK_SIZE_U64;
267            tail.resize(tail_size).await?;
268            truncated = true;
269        }
270
271        // Truncate any records with failing checksums. This can happen if the file system allocated
272        // extra space for a blob but there was a crash before any data was written to that space.
273        while tail_size > 0 {
274            let offset = tail_size - Self::CHUNK_SIZE_U64;
275            let read = tail.read_at(vec![0u8; Self::CHUNK_SIZE], offset).await?;
276            match Self::verify_integrity(read.as_ref()) {
277                Ok(_) => break, // Valid item found, we can stop truncating.
278                Err(Error::ChecksumMismatch(_, _)) => {
279                    warn!(blob = tail_index, offset, "checksum mismatch: truncating",);
280                    tail_size -= Self::CHUNK_SIZE_U64;
281                    tail.resize(tail_size).await?;
282                    truncated = true;
283                }
284                Err(err) => return Err(err),
285            }
286        }
287
288        // If we truncated the blob, make sure to sync it.
289        if truncated {
290            tail.sync().await?;
291        }
292
293        Ok(tail_size)
294    }
295
296    /// Sync any pending updates to disk.
297    pub async fn sync(&mut self) -> Result<(), Error> {
298        self.synced.inc();
299        debug!(blob = self.tail_index, "syncing blob");
300        self.tail.sync().await.map_err(Error::Runtime)
301    }
302
303    /// Return the total number of items in the journal, irrespective of pruning. The next value
304    /// appended to the journal will be at this position.
305    pub async fn size(&self) -> u64 {
306        let size = self.tail.size().await;
307        assert_eq!(size % Self::CHUNK_SIZE_U64, 0);
308        let items_in_blob = size / Self::CHUNK_SIZE_U64;
309        items_in_blob + self.cfg.items_per_blob.get() * self.tail_index
310    }
311
312    /// Append a new item to the journal. Return the item's position in the journal, or error if the
313    /// operation fails.
314    pub async fn append(&mut self, item: A) -> Result<u64, Error> {
315        // There should always be room to append an item in the newest blob
316        let mut size = self.tail.size().await;
317        assert!(size < self.cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64);
318        assert_eq!(size % Self::CHUNK_SIZE_U64, 0);
319        let mut buf: Vec<u8> = Vec::with_capacity(Self::CHUNK_SIZE);
320        let item = item.encode();
321        let checksum = crc32fast::hash(&item);
322        buf.extend_from_slice(&item);
323        buf.put_u32(checksum);
324
325        // Write the item to the blob
326        let item_pos =
327            (size / Self::CHUNK_SIZE_U64) + self.cfg.items_per_blob.get() * self.tail_index;
328        self.tail.append(buf).await?;
329        trace!(blob = self.tail_index, pos = item_pos, "appended item");
330        size += Self::CHUNK_SIZE_U64;
331
332        // If the tail blob is now full we need to create a new empty one to fulfill the invariant
333        // that the tail blob always has room for a new element.
334        if size == self.cfg.items_per_blob.get() * Self::CHUNK_SIZE_U64 {
335            // Sync the tail blob before creating a new one so if we crash we don't end up with a
336            // non-full historical blob.
337            self.tail.sync().await?;
338
339            // Create a new empty blob.
340            let next_blob_index = self.tail_index + 1;
341            debug!(blob = next_blob_index, "creating next blob");
342            let (next_blob, size) = self
343                .context
344                .open(&self.cfg.partition, &next_blob_index.to_be_bytes())
345                .await?;
346            assert_eq!(size, 0);
347            let next_blob = Append::new(
348                next_blob,
349                size,
350                self.cfg.write_buffer,
351                self.cfg.buffer_pool.clone(),
352            )
353            .await?;
354            self.tracked.inc();
355
356            // Move the old tail blob to the historical blobs map and set the new blob as the tail.
357            let old_tail = std::mem::replace(&mut self.tail, next_blob);
358            assert!(self.blobs.insert(self.tail_index, old_tail).is_none());
359            self.tail_index = next_blob_index;
360        }
361
362        Ok(item_pos)
363    }
364
365    /// Rewind the journal to the given `size`. Returns [Error::MissingBlob] if the rewind point
366    /// precedes the oldest retained element point. The journal is not synced after rewinding.
367    ///
368    /// # Warnings
369    ///
370    /// * This operation is not guaranteed to survive restarts until sync is called.
371    /// * This operation is not atomic, but it will always leave the journal in a consistent state
372    ///   in the event of failure since blobs are always removed from newest to oldest.
373    pub async fn rewind(&mut self, size: u64) -> Result<(), Error> {
374        match size.cmp(&self.size().await) {
375            std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(size)),
376            std::cmp::Ordering::Equal => return Ok(()),
377            std::cmp::Ordering::Less => {}
378        }
379        let rewind_to_blob_index = size / self.cfg.items_per_blob;
380        if rewind_to_blob_index < self.oldest_blob_index() {
381            return Err(Error::InvalidRewind(size));
382        }
383        let rewind_to_offset = (size % self.cfg.items_per_blob) * Self::CHUNK_SIZE_U64;
384
385        // Remove blobs until we reach the rewind point.  Blobs must be removed in reverse order to
386        // preserve consistency in the event of failures.
387        while rewind_to_blob_index < self.tail_index {
388            let (blob_index, mut new_tail) = self.blobs.pop_last().unwrap();
389            assert_eq!(blob_index, self.tail_index - 1);
390            std::mem::swap(&mut self.tail, &mut new_tail);
391            self.remove_blob(self.tail_index, new_tail).await?;
392            self.tail_index -= 1;
393        }
394
395        // Truncate the tail blob to the correct offset.
396        self.tail.resize(rewind_to_offset).await?;
397
398        Ok(())
399    }
400
401    /// Return the position of the oldest item in the journal that remains readable.
402    ///
403    /// Note that this value could be older than the `min_item_pos` last passed to prune.
404    pub async fn oldest_retained_pos(&self) -> Result<Option<u64>, Error> {
405        let oldest_blob_index = self.oldest_blob_index();
406        if oldest_blob_index == self.tail_index && self.tail.size().await == 0 {
407            return Ok(None);
408        }
409
410        // The oldest retained item is the first item in the oldest blob.
411        Ok(Some(oldest_blob_index * self.cfg.items_per_blob.get()))
412    }
413
414    /// Read the item at position `pos` in the journal.
415    ///
416    /// # Errors
417    ///
418    ///  - [Error::ItemPruned] if the item at position `pos` is pruned.
419    ///  - [Error::ItemOutOfRange] if the item at position `pos` does not exist.
420    pub async fn read(&self, pos: u64) -> Result<A, Error> {
421        let blob_index = pos / self.cfg.items_per_blob.get();
422        if blob_index > self.tail_index {
423            return Err(Error::ItemOutOfRange(pos));
424        }
425
426        let offset = (pos % self.cfg.items_per_blob.get()) * Self::CHUNK_SIZE_U64;
427
428        let blob = if blob_index == self.tail_index {
429            if offset >= self.tail.size().await {
430                return Err(Error::ItemOutOfRange(pos));
431            }
432            &self.tail
433        } else {
434            self.blobs.get(&blob_index).ok_or(Error::ItemPruned(pos))?
435        };
436
437        let read = blob.read_at(vec![0u8; Self::CHUNK_SIZE], offset).await?;
438        Self::verify_integrity(read.as_ref())
439    }
440
441    /// Verify the integrity of the Array + checksum in `buf`, returning:
442    /// - The array if it is valid,
443    /// - Error::ChecksumMismatch if the checksum is invalid, or
444    /// - Error::Codec if the array could not be decoded after passing the checksum check.
445    ///
446    ///  Error::Codec likely indicates a logic error rather than a corruption issue.
447    fn verify_integrity(buf: &[u8]) -> Result<A, Error> {
448        let stored_checksum = u32::from_be_bytes(buf[A::SIZE..].try_into().unwrap());
449        let checksum = crc32fast::hash(&buf[..A::SIZE]);
450        if checksum != stored_checksum {
451            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
452        }
453        A::decode(&buf[..A::SIZE]).map_err(Error::Codec)
454    }
455
456    /// Returns an ordered stream of all items in the journal with position >= `start_pos`.
457    ///
458    /// # Panics
459    ///
460    /// Panics `start_pos` exceeds log size.
461    ///
462    /// # Integrity
463    ///
464    /// If any corrupted data is found, the stream will return an error.
465    pub async fn replay(
466        &self,
467        buffer: NonZeroUsize,
468        start_pos: u64,
469    ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + '_, Error> {
470        assert!(start_pos <= self.size().await);
471
472        // Collect all blobs to replay paired with their index.
473        let items_per_blob = self.cfg.items_per_blob.get();
474        let start_blob = start_pos / items_per_blob;
475        assert!(start_blob <= self.tail_index);
476        let blobs = self.blobs.range(start_blob..).collect::<Vec<_>>();
477        let full_size = items_per_blob * Self::CHUNK_SIZE_U64;
478        let mut blob_plus = blobs
479            .into_iter()
480            .map(|(blob_index, blob)| (*blob_index, blob.clone_blob(), full_size))
481            .collect::<Vec<_>>();
482
483        // Include the tail blob.
484        self.tail.sync().await?; // make sure no data is buffered
485        let tail_size = self.tail.size().await;
486        blob_plus.push((self.tail_index, self.tail.clone_blob(), tail_size));
487        let start_offset = (start_pos % items_per_blob) * Self::CHUNK_SIZE_U64;
488
489        // Replay all blobs in order and stream items as they are read (to avoid occupying too much
490        // memory with buffered data).
491        let stream = stream::iter(blob_plus).flat_map(move |(blob_index, blob, size)| {
492            // Create a new reader and buffer for each blob. Preallocating the buffer here to avoid
493            // a per-iteration allocation improves performance by ~20%.
494            let mut reader = Read::new(blob, size, buffer);
495            let buf = vec![0u8; Self::CHUNK_SIZE];
496            let initial_offset = if blob_index == start_blob {
497                // If this is the very first blob then we need to seek to the starting position.
498                reader.seek_to(start_offset).expect("invalid start_pos");
499                start_offset
500            } else {
501                0
502            };
503
504            stream::unfold(
505                (buf, reader, initial_offset),
506                move |(mut buf, mut reader, offset)| async move {
507                    if offset >= reader.blob_size() {
508                        return None;
509                    }
510
511                    // Even though we are reusing the buffer, `read_exact` will overwrite any
512                    // previous data, so there's no need to explicitly clear it.
513                    let item_pos = items_per_blob * blob_index + offset / Self::CHUNK_SIZE_U64;
514                    match reader.read_exact(&mut buf, Self::CHUNK_SIZE).await {
515                        Ok(()) => {
516                            let next_offset = offset + Self::CHUNK_SIZE_U64;
517                            let result = Self::verify_integrity(&buf).map(|item| (item_pos, item));
518                            if result.is_err() {
519                                warn!("corrupted item at {item_pos}");
520                            }
521                            Some((result, (buf, reader, next_offset)))
522                        }
523                        Err(err) => {
524                            warn!(
525                                item_pos,
526                                err = err.to_string(),
527                                "error reading item during replay"
528                            );
529                            Some((Err(Error::Runtime(err)), (buf, reader, size)))
530                        }
531                    }
532                },
533            )
534        });
535
536        Ok(stream)
537    }
538
539    /// Return the index of blob containing the oldest retained items.
540    fn oldest_blob_index(&self) -> u64 {
541        if self.blobs.is_empty() {
542            self.tail_index
543        } else {
544            *self.blobs.first_key_value().unwrap().0
545        }
546    }
547
548    /// Allow the journal to prune items older than `min_item_pos`. The journal may not prune all
549    /// such items in order to preserve blob boundaries, but the amount of such items will always be
550    /// less than the configured number of items per blob. Returns true if any items were pruned.
551    ///
552    /// Note that this operation may NOT be atomic, however it's guaranteed not to leave gaps in the
553    /// event of failure as items are always pruned in order from oldest to newest.
554    pub async fn prune(&mut self, min_item_pos: u64) -> Result<bool, Error> {
555        let oldest_blob_index = self.oldest_blob_index();
556        let new_oldest_blob =
557            std::cmp::min(min_item_pos / self.cfg.items_per_blob, self.tail_index);
558
559        let mut pruned = false;
560        for index in oldest_blob_index..new_oldest_blob {
561            pruned = true;
562            let blob = self.blobs.remove(&index).unwrap();
563            self.remove_blob(index, blob).await?;
564            self.pruned.inc();
565        }
566
567        Ok(pruned)
568    }
569
570    /// Safely removes any previously tracked blob from underlying storage.
571    async fn remove_blob(&mut self, index: u64, blob: Append<E::Blob>) -> Result<(), Error> {
572        drop(blob);
573        self.context
574            .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
575            .await?;
576        debug!(blob = index, "removed blob");
577        self.tracked.dec();
578
579        Ok(())
580    }
581
582    /// Syncs and closes all open sections.
583    pub async fn close(self) -> Result<(), Error> {
584        for (i, blob) in self.blobs.into_iter() {
585            blob.sync().await?;
586            debug!(blob = i, "synced blob");
587        }
588        self.tail.sync().await?;
589        debug!(blob = self.tail_index, "synced tail");
590
591        Ok(())
592    }
593
594    /// Remove any underlying blobs created by the journal.
595    pub async fn destroy(self) -> Result<(), Error> {
596        for (i, blob) in self.blobs.into_iter() {
597            drop(blob);
598            debug!(blob = i, "destroyed blob");
599            self.context
600                .remove(&self.cfg.partition, Some(&i.to_be_bytes()))
601                .await?;
602        }
603
604        drop(self.tail);
605        debug!(blob = self.tail_index, "destroyed blob");
606        self.context
607            .remove(&self.cfg.partition, Some(&self.tail_index.to_be_bytes()))
608            .await?;
609
610        match self.context.remove(&self.cfg.partition, None).await {
611            Ok(()) => {}
612            Err(RError::PartitionMissing(_)) => {
613                // Partition already removed or never existed.
614            }
615            Err(err) => return Err(Error::Runtime(err)),
616        }
617
618        Ok(())
619    }
620}
621
622// Implement Contiguous trait for fixed-length journals
623impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> super::Contiguous for Journal<E, A> {
624    type Item = A;
625
626    async fn append(&mut self, item: Self::Item) -> Result<u64, Error> {
627        Journal::append(self, item).await
628    }
629
630    async fn size(&self) -> u64 {
631        Journal::size(self).await
632    }
633
634    async fn oldest_retained_pos(&self) -> Result<Option<u64>, Error> {
635        Journal::oldest_retained_pos(self).await
636    }
637
638    async fn prune(&mut self, min_position: u64) -> Result<bool, Error> {
639        Journal::prune(self, min_position).await
640    }
641
642    async fn replay(
643        &self,
644        start_pos: u64,
645        buffer: NonZeroUsize,
646    ) -> Result<impl Stream<Item = Result<(u64, Self::Item), Error>> + '_, Error> {
647        Journal::replay(self, buffer, start_pos).await
648    }
649
650    async fn read(&self, position: u64) -> Result<Self::Item, Error> {
651        Journal::read(self, position).await
652    }
653
654    async fn sync(&mut self) -> Result<(), Error> {
655        Journal::sync(self).await
656    }
657
658    async fn close(self) -> Result<(), Error> {
659        Journal::close(self).await
660    }
661
662    async fn destroy(self) -> Result<(), Error> {
663        Journal::destroy(self).await
664    }
665
666    async fn rewind(&mut self, size: u64) -> Result<(), Error> {
667        Journal::rewind(self, size).await
668    }
669}
670
671#[cfg(test)]
672mod tests {
673    use super::*;
674    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
675    use commonware_macros::test_traced;
676    use commonware_runtime::{
677        deterministic::{self, Context},
678        Blob, Runner, Storage,
679    };
680    use commonware_utils::{NZUsize, NZU64};
681    use futures::{pin_mut, StreamExt};
682
683    const PAGE_SIZE: NonZeroUsize = NZUsize!(44);
684    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3);
685
686    /// Generate a SHA-256 digest for the given value.
687    fn test_digest(value: u64) -> Digest {
688        Sha256::hash(&value.to_be_bytes())
689    }
690
691    fn test_cfg(items_per_blob: NonZeroU64) -> Config {
692        Config {
693            partition: "test_partition".into(),
694            items_per_blob,
695            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
696            write_buffer: NZUsize!(2048),
697        }
698    }
699
700    #[test_traced]
701    fn test_fixed_journal_append_and_prune() {
702        // Initialize the deterministic context
703        let executor = deterministic::Runner::default();
704
705        // Start the test within the executor
706        executor.start(|context| async move {
707            // Initialize the journal, allowing a max of 2 items per blob.
708            let cfg = test_cfg(NZU64!(2));
709            let mut journal = Journal::init(context.clone(), cfg.clone())
710                .await
711                .expect("failed to initialize journal");
712
713            let buffer = context.encode();
714            assert!(buffer.contains("tracked 1"));
715
716            // Append an item to the journal
717            let mut pos = journal
718                .append(test_digest(0))
719                .await
720                .expect("failed to append data 0");
721            assert_eq!(pos, 0);
722
723            // Close the journal
724            journal.close().await.expect("Failed to close journal");
725
726            // Re-initialize the journal to simulate a restart
727            let cfg = test_cfg(NZU64!(2));
728            let mut journal = Journal::init(context.clone(), cfg.clone())
729                .await
730                .expect("failed to re-initialize journal");
731
732            // Append two more items to the journal to trigger a new blob creation
733            pos = journal
734                .append(test_digest(1))
735                .await
736                .expect("failed to append data 1");
737            assert_eq!(pos, 1);
738            pos = journal
739                .append(test_digest(2))
740                .await
741                .expect("failed to append data 2");
742            assert_eq!(pos, 2);
743            let buffer = context.encode();
744            assert!(buffer.contains("tracked 2"));
745
746            // Read the items back
747            let item0 = journal.read(0).await.expect("failed to read data 0");
748            assert_eq!(item0, test_digest(0));
749            let item1 = journal.read(1).await.expect("failed to read data 1");
750            assert_eq!(item1, test_digest(1));
751            let item2 = journal.read(2).await.expect("failed to read data 2");
752            assert_eq!(item2, test_digest(2));
753            let err = journal.read(3).await.expect_err("expected read to fail");
754            assert!(matches!(err, Error::ItemOutOfRange(3)));
755
756            // Sync the journal
757            journal.sync().await.expect("failed to sync journal");
758            let buffer = context.encode();
759            assert!(buffer.contains("synced_total 1"));
760
761            // Pruning to 1 should be a no-op because there's no blob with only older items.
762            journal.prune(1).await.expect("failed to prune journal 1");
763            let buffer = context.encode();
764            assert!(buffer.contains("tracked 2"));
765
766            // Pruning to 2 should allow the first blob to be pruned.
767            journal.prune(2).await.expect("failed to prune journal 2");
768            assert_eq!(journal.oldest_retained_pos().await.unwrap(), Some(2));
769            let buffer = context.encode();
770            assert!(buffer.contains("tracked 1"));
771            assert!(buffer.contains("pruned_total 1"));
772
773            // Reading from the first blob should fail since it's now pruned
774            let result0 = journal.read(0).await;
775            assert!(matches!(result0, Err(Error::ItemPruned(0))));
776            let result1 = journal.read(1).await;
777            assert!(matches!(result1, Err(Error::ItemPruned(1))));
778
779            // Third item should still be readable
780            let result2 = journal.read(2).await.unwrap();
781            assert_eq!(result2, test_digest(2));
782
783            // Should be able to continue to append items
784            for i in 3..10 {
785                let pos = journal
786                    .append(test_digest(i))
787                    .await
788                    .expect("failed to append data");
789                assert_eq!(pos, i);
790            }
791
792            // Check no-op pruning
793            journal.prune(0).await.expect("no-op pruning failed");
794            assert_eq!(journal.oldest_blob_index(), 1);
795            assert_eq!(journal.tail_index, 5);
796            assert_eq!(journal.oldest_retained_pos().await.unwrap(), Some(2));
797
798            // Prune first 3 blobs (6 items)
799            journal
800                .prune(3 * cfg.items_per_blob.get())
801                .await
802                .expect("failed to prune journal 2");
803            assert_eq!(journal.oldest_retained_pos().await.unwrap(), Some(6));
804            let buffer = context.encode();
805            assert_eq!(journal.oldest_blob_index(), 3);
806            assert_eq!(journal.tail_index, 5);
807            assert!(buffer.contains("tracked 3"));
808            assert!(buffer.contains("pruned_total 3"));
809
810            // Try pruning (more than) everything in the journal.
811            journal
812                .prune(10000)
813                .await
814                .expect("failed to max-prune journal");
815            let buffer = context.encode();
816            let size = journal.size().await;
817            assert_eq!(size, 10);
818            assert_eq!(journal.oldest_blob_index(), 5);
819            assert_eq!(journal.tail_index, 5);
820            assert!(buffer.contains("tracked 1"));
821            assert!(buffer.contains("pruned_total 5"));
822            // Since the size of the journal is currently a multiple of items_per_blob, the newest blob
823            // will be empty, and there will be no retained items.
824            assert_eq!(journal.oldest_retained_pos().await.unwrap(), None);
825
826            {
827                let stream = journal
828                    .replay(NZUsize!(1024), 0)
829                    .await
830                    .expect("failed to replay journal");
831                pin_mut!(stream);
832                let mut items = Vec::new();
833                while let Some(result) = stream.next().await {
834                    match result {
835                        Ok((pos, item)) => {
836                            assert_eq!(test_digest(pos), item);
837                            items.push(pos);
838                        }
839                        Err(err) => panic!("Failed to read item: {err}"),
840                    }
841                }
842                assert_eq!(items, Vec::<u64>::new());
843            }
844
845            journal.destroy().await.unwrap();
846        });
847    }
848
849    /// Append a lot of data to make sure we exercise buffer pool paging boundaries.
850    #[test_traced]
851    fn test_fixed_journal_append_a_lot_of_data() {
852        // Initialize the deterministic context
853        let executor = deterministic::Runner::default();
854        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10000);
855        executor.start(|context| async move {
856            let cfg = test_cfg(ITEMS_PER_BLOB);
857            let mut journal = Journal::init(context.clone(), cfg.clone())
858                .await
859                .expect("failed to initialize journal");
860            // Append 2 blobs worth of items.
861            for i in 0u64..ITEMS_PER_BLOB.get() * 2 - 1 {
862                journal
863                    .append(test_digest(i))
864                    .await
865                    .expect("failed to append data");
866            }
867            // Close, reopen, then read back.
868            journal.close().await.expect("failed to close journal");
869            let journal = Journal::init(context.clone(), cfg.clone())
870                .await
871                .expect("failed to re-initialize journal");
872            for i in 0u64..10000 {
873                let item: Digest = journal.read(i).await.expect("failed to read data");
874                assert_eq!(item, test_digest(i));
875            }
876            journal.destroy().await.expect("failed to destroy journal");
877        });
878    }
879
880    #[test_traced]
881    fn test_fixed_journal_replay() {
882        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
883        // Initialize the deterministic context
884        let executor = deterministic::Runner::default();
885
886        // Start the test within the executor
887        executor.start(|context| async move {
888            // Initialize the journal, allowing a max of 7 items per blob.
889            let cfg = test_cfg(ITEMS_PER_BLOB);
890            let mut journal = Journal::init(context.clone(), cfg.clone())
891                .await
892                .expect("failed to initialize journal");
893
894            // Append many items, filling 100 blobs and part of the 101st
895            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
896                let pos = journal
897                    .append(test_digest(i))
898                    .await
899                    .expect("failed to append data");
900                assert_eq!(pos, i);
901            }
902
903            let buffer = context.encode();
904            assert!(buffer.contains("tracked 101"));
905
906            // Read them back the usual way.
907            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
908                let item: Digest = journal.read(i).await.expect("failed to read data");
909                assert_eq!(item, test_digest(i), "i={i}");
910            }
911
912            // Replay should return all items
913            {
914                let stream = journal
915                    .replay(NZUsize!(1024), 0)
916                    .await
917                    .expect("failed to replay journal");
918                let mut items = Vec::new();
919                pin_mut!(stream);
920                while let Some(result) = stream.next().await {
921                    match result {
922                        Ok((pos, item)) => {
923                            assert_eq!(test_digest(pos), item, "pos={pos}, item={item:?}");
924                            items.push(pos);
925                        }
926                        Err(err) => panic!("Failed to read item: {err}"),
927                    }
928                }
929
930                // Make sure all items were replayed
931                assert_eq!(
932                    items.len(),
933                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
934                );
935                items.sort();
936                for (i, pos) in items.iter().enumerate() {
937                    assert_eq!(i as u64, *pos);
938                }
939            }
940            journal.close().await.expect("Failed to close journal");
941
942            // Corrupt one of the checksums and make sure it's detected.
943            let checksum_offset = Digest::SIZE as u64
944                + (ITEMS_PER_BLOB.get() / 2) * (Digest::SIZE + u32::SIZE) as u64;
945            let (blob, _) = context
946                .open(&cfg.partition, &40u64.to_be_bytes())
947                .await
948                .expect("Failed to open blob");
949            // Write incorrect checksum
950            let bad_checksum = 123456789u32;
951            blob.write_at(bad_checksum.to_be_bytes().to_vec(), checksum_offset)
952                .await
953                .expect("Failed to write incorrect checksum");
954            let corrupted_item_pos = 40 * ITEMS_PER_BLOB.get() + ITEMS_PER_BLOB.get() / 2;
955            blob.sync().await.expect("Failed to sync blob");
956
957            // Re-initialize the journal to simulate a restart
958            let journal = Journal::init(context.clone(), cfg.clone())
959                .await
960                .expect("Failed to re-initialize journal");
961
962            // Make sure reading the corrupted item fails with appropriate error.
963            let err = journal.read(corrupted_item_pos).await.unwrap_err();
964            assert!(matches!(err, Error::ChecksumMismatch(x, _) if x == bad_checksum));
965
966            // Replay all items, making sure the checksum mismatch error is handled correctly.
967            {
968                let stream = journal
969                    .replay(NZUsize!(1024), 0)
970                    .await
971                    .expect("failed to replay journal");
972                let mut items = Vec::new();
973                pin_mut!(stream);
974                let mut error_count = 0;
975                while let Some(result) = stream.next().await {
976                    match result {
977                        Ok((pos, item)) => {
978                            assert_eq!(test_digest(pos), item);
979                            items.push(pos);
980                        }
981                        Err(err) => {
982                            error_count += 1;
983                            assert!(matches!(err, Error::ChecksumMismatch(_, _)));
984                        }
985                    }
986                }
987                assert_eq!(error_count, 1);
988                // Result will be missing only the one corrupted value.
989                assert_eq!(
990                    items.len(),
991                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2 - 1
992                );
993            }
994            journal.close().await.expect("Failed to close journal");
995        });
996    }
997
998    #[test_traced]
999    fn test_fixed_journal_init_with_corrupted_historical_blobs() {
1000        // Initialize the deterministic context
1001        let executor = deterministic::Runner::default();
1002        // Start the test within the executor
1003        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1004        executor.start(|context| async move {
1005            // Initialize the journal, allowing a max of 7 items per blob.
1006            let cfg = test_cfg(ITEMS_PER_BLOB);
1007            let mut journal = Journal::init(context.clone(), cfg.clone())
1008                .await
1009                .expect("failed to initialize journal");
1010
1011            // Append many items, filling 100 blobs and part of the 101st
1012            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1013                let pos = journal
1014                    .append(test_digest(i))
1015                    .await
1016                    .expect("failed to append data");
1017                assert_eq!(pos, i);
1018            }
1019            journal.close().await.expect("Failed to close journal");
1020
1021            let buffer = context.encode();
1022            assert!(buffer.contains("tracked 101"));
1023
1024            // Manually truncate a non-tail blob to make sure it's detected during initialization.
1025            let (blob, size) = context
1026                .open(&cfg.partition, &40u64.to_be_bytes())
1027                .await
1028                .expect("Failed to open blob");
1029            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1030            blob.sync().await.expect("Failed to sync blob");
1031            let result = Journal::<_, Digest>::init(context.clone(), cfg.clone()).await;
1032            assert!(matches!(
1033                result.err().unwrap(),
1034                Error::InvalidBlobSize(_, _)
1035            ));
1036
1037            // Delete a blob and make sure the gap is detected during initialization.
1038            context
1039                .remove(&cfg.partition, Some(&40u64.to_be_bytes()))
1040                .await
1041                .expect("Failed to remove blob");
1042            let result = Journal::<_, Digest>::init(context.clone(), cfg.clone()).await;
1043            assert!(matches!(result.err().unwrap(), Error::MissingBlob(n) if n == 40));
1044        });
1045    }
1046
1047    #[test_traced]
1048    fn test_fixed_journal_test_trim_blob() {
1049        // Initialize the deterministic context
1050        let executor = deterministic::Runner::default();
1051        // Start the test within the executor
1052        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1053        executor.start(|context| async move {
1054            // Initialize the journal, allowing a max of 7 items per blob.
1055            let cfg = test_cfg(ITEMS_PER_BLOB);
1056            let mut journal = Journal::init(context.clone(), cfg.clone())
1057                .await
1058                .expect("failed to initialize journal");
1059
1060            // Fill one blob and put 3 items in the second.
1061            let item_count = ITEMS_PER_BLOB.get() + 3;
1062            for i in 0u64..item_count {
1063                journal
1064                    .append(test_digest(i))
1065                    .await
1066                    .expect("failed to append data");
1067            }
1068            assert_eq!(journal.size().await, item_count);
1069            journal.close().await.expect("Failed to close journal");
1070
1071            // Truncate the tail blob by one byte, which should result in the 3rd item being
1072            // trimmed.
1073            let (blob, size) = context
1074                .open(&cfg.partition, &1u64.to_be_bytes())
1075                .await
1076                .expect("Failed to open blob");
1077            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1078
1079            // Write incorrect checksum into the second item in the blob, which should result in the
1080            // second item being trimmed.
1081            let checksum_offset = Digest::SIZE + u32::SIZE + Digest::SIZE;
1082
1083            let bad_checksum = 123456789u32;
1084            blob.write_at(bad_checksum.to_be_bytes().to_vec(), checksum_offset as u64)
1085                .await
1086                .expect("Failed to write incorrect checksum");
1087            blob.sync().await.expect("Failed to sync blob");
1088
1089            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1090                .await
1091                .unwrap();
1092
1093            // Confirm 2 items were trimmed.
1094            assert_eq!(journal.size().await, item_count - 2);
1095
1096            // Corrupt the last item, ensuring last blob is trimmed to empty state.
1097            let (blob, size) = context
1098                .open(&cfg.partition, &1u64.to_be_bytes())
1099                .await
1100                .expect("Failed to open blob");
1101            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1102            blob.sync().await.expect("Failed to sync blob");
1103
1104            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1105                .await
1106                .unwrap();
1107
1108            // Confirm last item in blob was trimmed.
1109            assert_eq!(journal.size().await, item_count - 3);
1110
1111            // Cleanup.
1112            journal.destroy().await.expect("Failed to destroy journal");
1113        });
1114    }
1115
1116    #[test_traced]
1117    fn test_fixed_journal_partial_replay() {
1118        const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(7);
1119        // 53 % 7 = 4, which will trigger a non-trivial seek in the starting blob to reach the
1120        // starting position.
1121        const START_POS: u64 = 53;
1122
1123        // Initialize the deterministic context
1124        let executor = deterministic::Runner::default();
1125        // Start the test within the executor
1126        executor.start(|context| async move {
1127            // Initialize the journal, allowing a max of 7 items per blob.
1128            let cfg = test_cfg(ITEMS_PER_BLOB);
1129            let mut journal = Journal::init(context.clone(), cfg.clone())
1130                .await
1131                .expect("failed to initialize journal");
1132
1133            // Append many items, filling 100 blobs and part of the 101st
1134            for i in 0u64..(ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2) {
1135                let pos = journal
1136                    .append(test_digest(i))
1137                    .await
1138                    .expect("failed to append data");
1139                assert_eq!(pos, i);
1140            }
1141
1142            let buffer = context.encode();
1143            assert!(buffer.contains("tracked 101"));
1144
1145            // Replay should return all items except the first `START_POS`.
1146            {
1147                let stream = journal
1148                    .replay(NZUsize!(1024), START_POS)
1149                    .await
1150                    .expect("failed to replay journal");
1151                let mut items = Vec::new();
1152                pin_mut!(stream);
1153                while let Some(result) = stream.next().await {
1154                    match result {
1155                        Ok((pos, item)) => {
1156                            assert!(pos >= START_POS, "pos={pos}");
1157                            assert_eq!(
1158                                test_digest(pos),
1159                                item,
1160                                "Item at position {pos} did not match expected digest"
1161                            );
1162                            items.push(pos);
1163                        }
1164                        Err(err) => panic!("Failed to read item: {err}"),
1165                    }
1166                }
1167
1168                // Make sure all items were replayed
1169                assert_eq!(
1170                    items.len(),
1171                    ITEMS_PER_BLOB.get() as usize * 100 + ITEMS_PER_BLOB.get() as usize / 2
1172                        - START_POS as usize
1173                );
1174                items.sort();
1175                for (i, pos) in items.iter().enumerate() {
1176                    assert_eq!(i as u64, *pos - START_POS);
1177                }
1178            }
1179
1180            journal.destroy().await.unwrap();
1181        });
1182    }
1183
1184    #[test_traced]
1185    fn test_fixed_journal_recover_from_partial_write() {
1186        // Initialize the deterministic context
1187        let executor = deterministic::Runner::default();
1188
1189        // Start the test within the executor
1190        executor.start(|context| async move {
1191            // Initialize the journal, allowing a max of 3 items per blob.
1192            let cfg = test_cfg(NZU64!(3));
1193            let mut journal = Journal::init(context.clone(), cfg.clone())
1194                .await
1195                .expect("failed to initialize journal");
1196            for i in 0..5 {
1197                journal
1198                    .append(test_digest(i))
1199                    .await
1200                    .expect("failed to append data");
1201            }
1202            assert_eq!(journal.size().await, 5);
1203            let buffer = context.encode();
1204            assert!(buffer.contains("tracked 2"));
1205            journal.close().await.expect("Failed to close journal");
1206
1207            // Manually truncate most recent blob to simulate a partial write.
1208            let (blob, size) = context
1209                .open(&cfg.partition, &1u64.to_be_bytes())
1210                .await
1211                .expect("Failed to open blob");
1212            // truncate the most recent blob by 1 byte which corrupts the most recent item
1213            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1214            blob.sync().await.expect("Failed to sync blob");
1215
1216            // Re-initialize the journal to simulate a restart
1217            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1218                .await
1219                .expect("Failed to re-initialize journal");
1220            // the last corrupted item should get discarded
1221            assert_eq!(journal.size().await, 4);
1222            let buffer = context.encode();
1223            assert!(buffer.contains("tracked 2"));
1224            journal.close().await.expect("Failed to close journal");
1225
1226            // Delete the tail blob to simulate a sync() that wrote the last blob at the point it
1227            // was entirely full, but a crash happened before the next empty blob could be created.
1228            context
1229                .remove(&cfg.partition, Some(&1u64.to_be_bytes()))
1230                .await
1231                .expect("Failed to remove blob");
1232            let journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1233                .await
1234                .expect("Failed to re-initialize journal");
1235            assert_eq!(journal.size().await, 3);
1236            let buffer = context.encode();
1237            // Even though it was deleted, tail blob should be re-created and left empty by the
1238            // recovery code. This means we have 2 blobs total, with 3 items in the first, and none
1239            // in the tail.
1240            assert!(buffer.contains("tracked 2"));
1241            assert_eq!(journal.size().await, 3);
1242
1243            journal.destroy().await.unwrap();
1244        });
1245    }
1246
1247    #[test_traced]
1248    fn test_fixed_journal_recover_to_empty_from_partial_write() {
1249        let executor = deterministic::Runner::default();
1250        executor.start(|context| async move {
1251            // Initialize the journal, allowing a max of 10 items per blob.
1252            let cfg = test_cfg(NZU64!(10));
1253            let mut journal = Journal::init(context.clone(), cfg.clone())
1254                .await
1255                .expect("failed to initialize journal");
1256            // Add only a single item
1257            journal
1258                .append(test_digest(0))
1259                .await
1260                .expect("failed to append data");
1261            assert_eq!(journal.size().await, 1);
1262            journal.close().await.expect("Failed to close journal");
1263
1264            // Manually truncate most recent blob to simulate a partial write.
1265            let (blob, size) = context
1266                .open(&cfg.partition, &0u64.to_be_bytes())
1267                .await
1268                .expect("Failed to open blob");
1269            // Truncate the most recent blob by 1 byte which corrupts the one appended item
1270            blob.resize(size - 1).await.expect("Failed to corrupt blob");
1271            blob.sync().await.expect("Failed to sync blob");
1272
1273            // Re-initialize the journal to simulate a restart
1274            let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1275                .await
1276                .expect("Failed to re-initialize journal");
1277
1278            // Since there was only a single item appended which we then corrupted, recovery should
1279            // leave us in the state of an empty journal.
1280            assert_eq!(journal.size().await, 0);
1281            assert_eq!(journal.oldest_retained_pos().await.unwrap(), None);
1282            // Make sure journal still works for appending.
1283            journal
1284                .append(test_digest(0))
1285                .await
1286                .expect("failed to append data");
1287            assert_eq!(journal.size().await, 1);
1288
1289            journal.destroy().await.unwrap();
1290        });
1291    }
1292
1293    #[test_traced]
1294    fn test_fixed_journal_recover_from_unwritten_data() {
1295        let executor = deterministic::Runner::default();
1296        executor.start(|context| async move {
1297            // Initialize the journal, allowing a max of 10 items per blob.
1298            let cfg = test_cfg(NZU64!(10));
1299            let mut journal = Journal::init(context.clone(), cfg.clone())
1300                .await
1301                .expect("failed to initialize journal");
1302
1303            // Add only a single item
1304            journal
1305                .append(test_digest(0))
1306                .await
1307                .expect("failed to append data");
1308            assert_eq!(journal.size().await, 1);
1309            journal.close().await.expect("Failed to close journal");
1310
1311            // Manually extend the blob by an amount at least some multiple of the chunk size to
1312            // simulate a failure where the file was extended, but no bytes were written due to
1313            // failure.
1314            let (blob, size) = context
1315                .open(&cfg.partition, &0u64.to_be_bytes())
1316                .await
1317                .expect("Failed to open blob");
1318            blob.write_at(vec![0u8; Digest::SIZE * 3 - 1], size)
1319                .await
1320                .expect("Failed to extend blob");
1321            blob.sync().await.expect("Failed to sync blob");
1322
1323            // Re-initialize the journal to simulate a restart
1324            let mut journal = Journal::<_, Digest>::init(context.clone(), cfg.clone())
1325                .await
1326                .expect("Failed to re-initialize journal");
1327
1328            // Ensure we've recovered to the state of a single item.
1329            assert_eq!(journal.size().await, 1);
1330            assert_eq!(journal.oldest_retained_pos().await.unwrap(), Some(0));
1331
1332            // Make sure journal still works for appending.
1333            journal
1334                .append(test_digest(1))
1335                .await
1336                .expect("failed to append data");
1337            assert_eq!(journal.size().await, 2);
1338
1339            // Get the value of the first item
1340            let item = journal.read(0).await.unwrap();
1341            assert_eq!(item, test_digest(0));
1342
1343            // Get the value of new item
1344            let item = journal.read(1).await.unwrap();
1345            assert_eq!(item, test_digest(1));
1346
1347            journal.destroy().await.unwrap();
1348        });
1349    }
1350
1351    #[test_traced]
1352    fn test_fixed_journal_rewinding() {
1353        let executor = deterministic::Runner::default();
1354        executor.start(|context| async move {
1355            // Initialize the journal, allowing a max of 2 items per blob.
1356            let cfg = test_cfg(NZU64!(2));
1357            let mut journal = Journal::init(context.clone(), cfg.clone())
1358                .await
1359                .expect("failed to initialize journal");
1360            assert!(matches!(journal.rewind(0).await, Ok(())));
1361            assert!(matches!(
1362                journal.rewind(1).await,
1363                Err(Error::InvalidRewind(1))
1364            ));
1365
1366            // Append an item to the journal
1367            journal
1368                .append(test_digest(0))
1369                .await
1370                .expect("failed to append data 0");
1371            assert_eq!(journal.size().await, 1);
1372            assert!(matches!(journal.rewind(1).await, Ok(()))); // should be no-op
1373            assert!(matches!(journal.rewind(0).await, Ok(())));
1374            assert_eq!(journal.size().await, 0);
1375
1376            // append 7 items
1377            for i in 0..7 {
1378                let pos = journal
1379                    .append(test_digest(i))
1380                    .await
1381                    .expect("failed to append data");
1382                assert_eq!(pos, i);
1383            }
1384            let buffer = context.encode();
1385            assert!(buffer.contains("tracked 4"));
1386            assert_eq!(journal.size().await, 7);
1387
1388            // rewind back to item #4, which should prune 2 blobs
1389            assert!(matches!(journal.rewind(4).await, Ok(())));
1390            assert_eq!(journal.size().await, 4);
1391            let buffer = context.encode();
1392            assert!(buffer.contains("tracked 3"));
1393
1394            // rewind back to empty and ensure all blobs are rewound over
1395            assert!(matches!(journal.rewind(0).await, Ok(())));
1396            let buffer = context.encode();
1397            assert!(buffer.contains("tracked 1"));
1398            assert_eq!(journal.size().await, 0);
1399
1400            // stress test: add 100 items, rewind 49, repeat x10.
1401            for _ in 0..10 {
1402                for i in 0..100 {
1403                    journal
1404                        .append(test_digest(i))
1405                        .await
1406                        .expect("failed to append data");
1407                }
1408                journal.rewind(journal.size().await - 49).await.unwrap();
1409            }
1410            const ITEMS_REMAINING: u64 = 10 * (100 - 49);
1411            assert_eq!(journal.size().await, ITEMS_REMAINING);
1412
1413            journal.close().await.expect("Failed to close journal");
1414
1415            // Repeat with a different blob size (3 items per blob)
1416            let mut cfg = test_cfg(NZU64!(3));
1417            cfg.partition = "test_partition_2".into();
1418            let mut journal = Journal::init(context.clone(), cfg.clone())
1419                .await
1420                .expect("failed to initialize journal");
1421            for _ in 0..10 {
1422                for i in 0..100 {
1423                    journal
1424                        .append(test_digest(i))
1425                        .await
1426                        .expect("failed to append data");
1427                }
1428                journal.rewind(journal.size().await - 49).await.unwrap();
1429            }
1430            assert_eq!(journal.size().await, ITEMS_REMAINING);
1431
1432            journal.close().await.expect("Failed to close journal");
1433
1434            // Make sure re-opened journal is as expected
1435            let mut journal: Journal<_, Digest> = Journal::init(context.clone(), cfg.clone())
1436                .await
1437                .expect("failed to re-initialize journal");
1438            assert_eq!(journal.size().await, 10 * (100 - 49));
1439
1440            // Make sure rewinding works after pruning
1441            journal.prune(300).await.expect("pruning failed");
1442            assert_eq!(journal.size().await, ITEMS_REMAINING);
1443            // Rewinding prior to our prune point should fail.
1444            assert!(matches!(
1445                journal.rewind(299).await,
1446                Err(Error::InvalidRewind(299))
1447            ));
1448            // Rewinding to the prune point should work.
1449            // always remain in the journal.
1450            assert!(matches!(journal.rewind(300).await, Ok(())));
1451            assert_eq!(journal.size().await, 300);
1452            assert_eq!(journal.oldest_retained_pos().await.unwrap(), None);
1453
1454            journal.destroy().await.unwrap();
1455        });
1456    }
1457
1458    /// Protect against accidental changes to the journal disk format.
1459    #[test_traced]
1460    fn test_journal_conformance() {
1461        // Initialize the deterministic context
1462        let executor = deterministic::Runner::default();
1463
1464        // Start the test within the executor
1465        executor.start(|context| async move {
1466            // Create a journal configuration
1467            let cfg = test_cfg(NZU64!(60));
1468
1469            // Initialize the journal
1470            let mut journal = Journal::init(context.clone(), cfg.clone())
1471                .await
1472                .expect("failed to initialize journal");
1473
1474            // Append 100 items to the journal
1475            for i in 0..100 {
1476                journal
1477                    .append(test_digest(i))
1478                    .await
1479                    .expect("Failed to append data");
1480            }
1481
1482            // Close the journal
1483            journal.close().await.expect("Failed to close journal");
1484
1485            // Hash blob contents
1486            let (blob, size) = context
1487                .open(&cfg.partition, &0u64.to_be_bytes())
1488                .await
1489                .expect("Failed to open blob");
1490            assert!(size > 0);
1491            let buf = blob
1492                .read_at(vec![0u8; size as usize], 0)
1493                .await
1494                .expect("Failed to read blob");
1495            let digest = Sha256::hash(buf.as_ref());
1496            assert_eq!(
1497                hex(&digest),
1498                "ed2ea67208cde2ee8c16cca5aa4f369f55b1402258c6b7760e5baf134e38944a",
1499            );
1500            blob.sync().await.expect("Failed to sync blob");
1501            let (blob, size) = context
1502                .open(&cfg.partition, &1u64.to_be_bytes())
1503                .await
1504                .expect("Failed to open blob");
1505            assert!(size > 0);
1506            let buf = blob
1507                .read_at(vec![0u8; size as usize], 0)
1508                .await
1509                .expect("Failed to read blob");
1510            let digest = Sha256::hash(buf.as_ref());
1511            assert_eq!(
1512                hex(&digest),
1513                "cc7efd4fc999aff36b9fd4213ba8da5810dc1849f92ae2ddf7c6dc40545f9aff",
1514            );
1515            blob.sync().await.expect("Failed to sync blob");
1516
1517            let journal = Journal::<Context, Digest>::init(context.clone(), cfg.clone())
1518                .await
1519                .expect("failed to initialize journal");
1520            journal.destroy().await.unwrap();
1521        });
1522    }
1523}